From 33a71b35301df08754a1fb68cd967dd82937a974 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 26 Mar 2025 17:21:34 -0700 Subject: [PATCH 01/15] update types --- mapillary_tools/upload_api_v4.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 6a1148722..980b753e4 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import enum import io import os @@ -36,12 +38,12 @@ class ClusterFileType(enum.Enum): class UploadService: user_access_token: str session_key: str - callbacks: T.List[T.Callable[[bytes, T.Optional[requests.Response]], None]] + callbacks: list[T.Callable[[bytes, requests.Response | None], None]] cluster_filetype: ClusterFileType - organization_id: T.Optional[T.Union[str, int]] + organization_id: str | int | None chunk_size: int - MIME_BY_CLUSTER_TYPE: T.Dict[ClusterFileType, str] = { + MIME_BY_CLUSTER_TYPE: dict[ClusterFileType, str] = { ClusterFileType.ZIP: "application/zip", ClusterFileType.BLACKVUE: "video/mp4", ClusterFileType.CAMM: "video/mp4", @@ -51,7 +53,7 @@ def __init__( self, user_access_token: str, session_key: str, - organization_id: T.Optional[T.Union[str, int]] = None, + organization_id: str | int | None = None, cluster_filetype: ClusterFileType = ClusterFileType.ZIP, chunk_size: int = DEFAULT_CHUNK_SIZE, ): @@ -83,7 +85,7 @@ def fetch_offset(self) -> int: def upload( self, data: T.IO[bytes], - offset: T.Optional[int] = None, + offset: int | None = None, ) -> str: chunks = self._chunkize_byte_stream(data) return self.upload_chunks(chunks, offset=offset) @@ -123,7 +125,7 @@ def _attach_callbacks( def upload_chunks( self, chunks: T.Iterable[bytes], - offset: T.Optional[int] = None, + offset: int | None = None, ) -> str: if offset is None: offset = self.fetch_offset() @@ -158,7 +160,7 @@ def finish(self, file_handle: str) -> str: headers = { "Authorization": f"OAuth {self.user_access_token}", } - data: T.Dict[str, T.Union[str, int]] = { + data: dict[str, str | int] = { "file_handle": file_handle, "file_type": self.cluster_filetype.value, } @@ -199,7 +201,7 @@ def __init__(self, *args, **kwargs): def upload_chunks( self, chunks: T.Iterable[bytes], - offset: T.Optional[int] = None, + offset: int | None = None, ) -> str: if offset is None: offset = self.fetch_offset() From 9699b2af95310d40b31164b7f846d3444e5ab850 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 11:33:56 -0700 Subject: [PATCH 02/15] support chunks directly in upload_api_v4 --- mapillary_tools/upload_api_v4.py | 70 ++++++++++++++------------------ tests/cli/upload_api_v4.py | 38 +++++++++-------- tests/unit/test_upload_api_v4.py | 10 ++--- 3 files changed, 57 insertions(+), 61 deletions(-) diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 980b753e4..bee33da1f 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -19,7 +19,6 @@ MAPILLARY_UPLOAD_ENDPOINT = os.getenv( "MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads" ) -DEFAULT_CHUNK_SIZE = 1024 * 1024 * 16 # 16MB # According to the docs, UPLOAD_REQUESTS_TIMEOUT can be a tuple of # (connection_timeout, read_timeout): https://requests.readthedocs.io/en/latest/user/advanced/#timeouts # In my test, however, the connection_timeout rules both connection timeout and read timeout. @@ -38,7 +37,6 @@ class ClusterFileType(enum.Enum): class UploadService: user_access_token: str session_key: str - callbacks: list[T.Callable[[bytes, requests.Response | None], None]] cluster_filetype: ClusterFileType organization_id: str | int | None chunk_size: int @@ -55,18 +53,12 @@ def __init__( session_key: str, organization_id: str | int | None = None, cluster_filetype: ClusterFileType = ClusterFileType.ZIP, - chunk_size: int = DEFAULT_CHUNK_SIZE, ): - if chunk_size <= 0: - raise ValueError("Expect positive chunk size") - self.user_access_token = user_access_token self.session_key = session_key self.organization_id = organization_id # validate the input self.cluster_filetype = ClusterFileType(cluster_filetype) - self.callbacks = [] - self.chunk_size = chunk_size def fetch_offset(self) -> int: headers = { @@ -82,24 +74,19 @@ def fetch_offset(self) -> int: data = resp.json() return data["offset"] - def upload( - self, - data: T.IO[bytes], - offset: int | None = None, - ) -> str: - chunks = self._chunkize_byte_stream(data) - return self.upload_chunks(chunks, offset=offset) - - def _chunkize_byte_stream( - self, stream: T.IO[bytes] + @classmethod + def chunkize_byte_stream( + cls, stream: T.IO[bytes], chunk_size: int ) -> T.Generator[bytes, None, None]: + if chunk_size <= 0: + raise ValueError("Expect positive chunk size") while True: - data = stream.read(self.chunk_size) + data = stream.read(chunk_size) if not data: break yield data - def _offset_chunks( + def shift_chunks( self, chunks: T.Iterable[bytes], offset: int ) -> T.Generator[bytes, None, None]: assert offset >= 0, f"Expect non-negative offset but got {offset}" @@ -114,14 +101,6 @@ def _offset_chunks( else: yield chunk - def _attach_callbacks( - self, chunks: T.Iterable[bytes] - ) -> T.Generator[bytes, None, None]: - for chunk in chunks: - yield chunk - for callback in self.callbacks: - callback(chunk, None) - def upload_chunks( self, chunks: T.Iterable[bytes], @@ -129,8 +108,17 @@ def upload_chunks( ) -> str: if offset is None: offset = self.fetch_offset() + shifted_chunks = self.shift_chunks(chunks, offset) + return self.upload_shifted_chunks(shifted_chunks, offset) - chunks = self._attach_callbacks(self._offset_chunks(chunks, offset)) + def upload_shifted_chunks( + self, + shifted_chunks: T.Iterable[bytes], + offset: int, + ) -> str: + """ + Upload the chunks that must already be shifted by the offset (e.g. fp.seek(begin_offset, io.SEEK_SET)) + """ headers = { "Authorization": f"OAuth {self.user_access_token}", @@ -142,7 +130,7 @@ def upload_chunks( resp = request_post( url, headers=headers, - data=chunks, + data=shifted_chunks, timeout=UPLOAD_REQUESTS_TIMEOUT, ) @@ -196,22 +184,24 @@ def __init__(self, *args, **kwargs): self._upload_path = os.getenv( "MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads" ) - self._error_ratio = 0.1 + self._error_ratio = 0.02 - def upload_chunks( + @T.override + def upload_shifted_chunks( self, - chunks: T.Iterable[bytes], - offset: int | None = None, + shifted_chunks: T.Iterable[bytes], + offset: int, ) -> str: - if offset is None: - offset = self.fetch_offset() - - chunks = self._attach_callbacks(self._offset_chunks(chunks, offset)) + expected_offset = self.fetch_offset() + if offset != expected_offset: + raise ValueError( + f"Expect offset {expected_offset} but got {offset} for session {self.session_key}" + ) os.makedirs(self._upload_path, exist_ok=True) filename = os.path.join(self._upload_path, self.session_key) with open(filename, "ab") as fp: - for chunk in chunks: + for chunk in shifted_chunks: if random.random() <= self._error_ratio: raise requests.ConnectionError( f"TEST ONLY: Failed to upload with error ratio {self._error_ratio}" @@ -223,9 +213,11 @@ def upload_chunks( ) return uuid.uuid4().hex + @T.override def finish(self, _: str) -> str: return "0" + @T.override def fetch_offset(self) -> int: if random.random() <= self._error_ratio: raise requests.ConnectionError( diff --git a/tests/cli/upload_api_v4.py b/tests/cli/upload_api_v4.py index 1d1ceaf51..a14846413 100644 --- a/tests/cli/upload_api_v4.py +++ b/tests/cli/upload_api_v4.py @@ -8,7 +8,7 @@ import tqdm from mapillary_tools import api_v4, authenticate -from mapillary_tools.upload_api_v4 import DEFAULT_CHUNK_SIZE, UploadService +from mapillary_tools.upload_api_v4 import UploadService, FakeUploadService LOG = logging.getLogger("mapillary_tools") @@ -39,9 +39,10 @@ def _parse_args(): parser.add_argument( "--chunk_size", type=float, - default=DEFAULT_CHUNK_SIZE / (1024 * 1024), + default=2, help="chunk size in megabytes", ) + parser.add_argument("--dry_run", action="store_true", default=False) parser.add_argument("filename") parser.add_argument("session_key") return parser.parse_args() @@ -60,17 +61,13 @@ def main(): user_items = authenticate.fetch_user_items(parsed.user_name) session_key = parsed.session_key + chunk_size = int(parsed.chunk_size * 1024 * 1024) user_access_token = user_items.get("user_upload_token", "") - service = UploadService( - user_access_token, - session_key, - entity_size, - chunk_size=( - int(parsed.chunk_size * 1024 * 1024) - if parsed.chunk_size is not None - else DEFAULT_CHUNK_SIZE - ), - ) + + if parsed.dry_run: + service = FakeUploadService(user_access_token, session_key) + else: + service = UploadService(user_access_token, session_key) try: initial_offset = service.fetch_offset() @@ -80,9 +77,18 @@ def main(): LOG.info("Session key: %s", session_key) LOG.info("Initial offset: %s", initial_offset) LOG.info("Entity size: %d", entity_size) - LOG.info("Chunk size: %s MB", service.chunk_size / (1024 * 1024)) + LOG.info("Chunk size: %s MB", chunk_size / (1024 * 1024)) + + def _update_pbar(chunks, pbar): + for chunk in chunks: + yield chunk + pbar.update(len(chunk)) with open(parsed.filename, "rb") as fp: + fp.seek(initial_offset, io.SEEK_SET) + + shifted_chunks = service.chunkize_byte_stream(fp, chunk_size) + with tqdm.tqdm( total=entity_size, initial=initial_offset, @@ -91,9 +97,10 @@ def main(): unit_divisor=1024, disable=LOG.getEffectiveLevel() <= logging.DEBUG, ) as pbar: - service.callbacks.append(lambda chunk, resp: pbar.update(len(chunk))) try: - file_handle = service.upload(fp, initial_offset) + file_handle = service.upload_shifted_chunks( + _update_pbar(shifted_chunks, pbar), initial_offset + ) except requests.HTTPError as ex: raise RuntimeError(api_v4.readable_http_error(ex)) except KeyboardInterrupt: @@ -107,7 +114,6 @@ def main(): LOG.info("Final offset: %s", final_offset) LOG.info("Entity size: %d", entity_size) - LOG.info("File handle: %s", file_handle) diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index 8d9bc884c..3857f522f 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -11,16 +11,15 @@ def test_upload(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", - chunk_size=1, ) upload_service._error_ratio = 0 content = b"double_foobar" - cluster_id = upload_service.upload(io.BytesIO(content)) + cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1) assert isinstance(cluster_id, str), cluster_id assert (setup_upload.join("FOOBAR.txt").read_binary()) == content # reupload should not affect the file - upload_service.upload(io.BytesIO(content)) + upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1) assert (setup_upload.join("FOOBAR.txt").read_binary()) == content @@ -28,16 +27,15 @@ def test_upload_big_chunksize(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", - chunk_size=1000, ) upload_service._error_ratio = 0 content = b"double_foobar" - cluster_id = upload_service.upload(io.BytesIO(content)) + cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000) assert isinstance(cluster_id, str), cluster_id assert (setup_upload.join("FOOBAR.txt").read_binary()) == content # reupload should not affect the file - upload_service.upload(io.BytesIO(content)) + upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000) assert (setup_upload.join("FOOBAR.txt").read_binary()) == content From 8b86858d4d402339422ab0d295ceba8a13b613ec Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 11:54:34 -0700 Subject: [PATCH 03/15] move finish_upload() to api_v4.py --- mapillary_tools/api_v4.py | 34 +++++++++++++++++++ mapillary_tools/upload_api_v4.py | 58 ++++++-------------------------- 2 files changed, 45 insertions(+), 47 deletions(-) diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index 6e5088a7c..171587d9c 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -2,6 +2,7 @@ import os import ssl import typing as T +import enum from json import dumps import requests @@ -18,6 +19,12 @@ USE_SYSTEM_CERTS: bool = False +class ClusterFileType(enum.Enum): + ZIP = "zip" + BLACKVUE = "mly_blackvue_video" + CAMM = "mly_camm_video" + + class HTTPSystemCertsAdapter(HTTPAdapter): """ This adapter uses the system's certificate store instead of the certifi module. @@ -334,3 +341,30 @@ def log_event(action_type: ActionType, properties: T.Dict) -> requests.Response: ) resp.raise_for_status() return resp + + +def finish_upload( + user_access_token: str, + file_handle: str, + cluster_filetype: ClusterFileType, + organization_id: int | str | None = None, +) -> requests.Response: + data: dict[str, str | int] = { + "file_handle": file_handle, + "file_type": cluster_filetype.value, + } + if organization_id is not None: + data["organization_id"] = organization_id + + resp = request_post( + f"{MAPILLARY_GRAPH_API_ENDPOINT}/finish_upload", + headers={ + "Authorization": f"OAuth {user_access_token}", + }, + json=data, + timeout=REQUESTS_TIMEOUT, + ) + + resp.raise_for_status() + + return resp diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index bee33da1f..5b12ed1b5 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -1,6 +1,5 @@ from __future__ import annotations -import enum import io import os import random @@ -10,10 +9,10 @@ import requests from .api_v4 import ( - MAPILLARY_GRAPH_API_ENDPOINT, request_get, request_post, REQUESTS_TIMEOUT, + ClusterFileType, ) MAPILLARY_UPLOAD_ENDPOINT = os.getenv( @@ -28,17 +27,10 @@ UPLOAD_REQUESTS_TIMEOUT = (30 * 60, 30 * 60) # 30 minutes -class ClusterFileType(enum.Enum): - ZIP = "zip" - BLACKVUE = "mly_blackvue_video" - CAMM = "mly_camm_video" - - class UploadService: user_access_token: str session_key: str cluster_filetype: ClusterFileType - organization_id: str | int | None chunk_size: int MIME_BY_CLUSTER_TYPE: dict[ClusterFileType, str] = { @@ -51,12 +43,10 @@ def __init__( self, user_access_token: str, session_key: str, - organization_id: str | int | None = None, cluster_filetype: ClusterFileType = ClusterFileType.ZIP, ): self.user_access_token = user_access_token self.session_key = session_key - self.organization_id = organization_id # validate the input self.cluster_filetype = ClusterFileType(cluster_filetype) @@ -101,6 +91,16 @@ def shift_chunks( else: yield chunk + def upload_byte_stream( + self, + stream: T.IO[bytes], + offset: int | None = None, + chunk_size: int = 2 * 1024 * 1024, + ) -> str: + if offset is None: + offset = self.fetch_offset() + return self.upload_chunks(self.chunkize_byte_stream(stream, chunk_size), offset) + def upload_chunks( self, chunks: T.Iterable[bytes], @@ -144,38 +144,6 @@ def upload_shifted_chunks( f"Upload server error: File handle not found in the upload response {resp.text}" ) - def finish(self, file_handle: str) -> str: - headers = { - "Authorization": f"OAuth {self.user_access_token}", - } - data: dict[str, str | int] = { - "file_handle": file_handle, - "file_type": self.cluster_filetype.value, - } - if self.organization_id is not None: - data["organization_id"] = self.organization_id - - url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/finish_upload" - - resp = request_post( - url, - headers=headers, - json=data, - timeout=REQUESTS_TIMEOUT, - ) - - resp.raise_for_status() - - data = resp.json() - - cluster_id = data.get("cluster_id") - if cluster_id is None: - raise RuntimeError( - f"Upload server error: failed to create the cluster {resp.text}" - ) - - return T.cast(str, cluster_id) - # A mock class for testing only class FakeUploadService(UploadService): @@ -213,10 +181,6 @@ def upload_shifted_chunks( ) return uuid.uuid4().hex - @T.override - def finish(self, _: str) -> str: - return "0" - @T.override def fetch_offset(self) -> int: if random.random() <= self._error_ratio: From 99284115f3c6388df1759a86d54b4be536a3bb6c Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 12:27:21 -0700 Subject: [PATCH 04/15] update uploader.py --- mapillary_tools/api_v4.py | 2 +- mapillary_tools/authenticate.py | 3 +- mapillary_tools/upload.py | 2 +- mapillary_tools/upload_api_v4.py | 7 +- mapillary_tools/uploader.py | 280 +++++++++++++++++-------------- tests/cli/upload_api_v4.py | 2 +- 6 files changed, 159 insertions(+), 137 deletions(-) diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index 171587d9c..6bdc041c4 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -1,8 +1,8 @@ +import enum import logging import os import ssl import typing as T -import enum from json import dumps import requests diff --git a/mapillary_tools/authenticate.py b/mapillary_tools/authenticate.py index af956f5e2..f58a1f220 100644 --- a/mapillary_tools/authenticate.py +++ b/mapillary_tools/authenticate.py @@ -7,9 +7,10 @@ import sys import typing as T -import requests import jsonschema +import requests + from . import api_v4, config, constants, exceptions, types diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 055b64a9b..33b03a016 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -516,7 +516,7 @@ def _upload_everything( T.cast(T.BinaryIO, camm_fp), upload_api_v4.ClusterFileType.CAMM, video_metadata.md5sum, - event_payload=event_payload, + progress=event_payload, ) except Exception as ex: raise UploadError(ex) from ex diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 5b12ed1b5..b9088ca8b 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -8,12 +8,7 @@ import requests -from .api_v4 import ( - request_get, - request_post, - REQUESTS_TIMEOUT, - ClusterFileType, -) +from .api_v4 import ClusterFileType, request_get, request_post, REQUESTS_TIMEOUT MAPILLARY_UPLOAD_ENDPOINT = os.getenv( "MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads" diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index c9ae220d6..66fdf5963 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -15,7 +15,7 @@ import jsonschema import requests -from . import constants, exif_write, types, upload_api_v4, utils +from . import api_v4, constants, exif_write, types, upload_api_v4, utils LOG = logging.getLogger(__name__) @@ -28,6 +28,8 @@ class Progress(T.TypedDict, total=False): # File type file_type: str + begin_offset: int + # How many bytes has been uploaded so far since "upload_start" offset: int @@ -74,7 +76,7 @@ class UploadCancelled(Exception): class EventEmitter: - events: dict[EventName, T.List] + events: dict[EventName, list] def __init__(self): self.events = {} @@ -195,7 +197,7 @@ def __init__( self, user_items: types.UserItem, emitter: EventEmitter | None = None, - chunk_size: int = upload_api_v4.DEFAULT_CHUNK_SIZE, + chunk_size: int = 2 * 1024 * 1024, # 2MB dry_run=False, ): jsonschema.validate(instance=user_items, schema=types.UserItemSchema) @@ -219,7 +221,7 @@ def upload_zipfile( return None final_event_payload: Progress = { - **event_payload, # type: ignore + # **event_payload, # type: ignore "sequence_image_count": len(namelist), } @@ -235,7 +237,7 @@ def upload_zipfile( zip_fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, - event_payload=final_event_payload, + progress=final_event_payload, ) def upload_images( @@ -269,15 +271,88 @@ def upload_images( ret[sequence_uuid] = cluster_id return ret + def _create_upload_service( + self, session_key: str, cluster_filetype: upload_api_v4.ClusterFileType + ) -> upload_api_v4.UploadService: + upload_service: upload_api_v4.UploadService + + if self.dry_run: + upload_service = upload_api_v4.FakeUploadService( + user_access_token=self.user_items["user_upload_token"], + session_key=session_key, + cluster_filetype=cluster_filetype, + ) + else: + upload_service = upload_api_v4.UploadService( + user_access_token=self.user_items["user_upload_token"], + session_key=session_key, + cluster_filetype=cluster_filetype, + ) + + return upload_service + + def _handle_upload_exception(self, ex: Exception, progress: Progress) -> None: + retries = progress["retries"] + begin_offset = progress.get("begin_offset") + chunk_size = progress["chunk_size"] + + if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): + if self.emitter: + self.emitter.emit("upload_interrupted", progress) + LOG.warning( + # use %s instead of %d because offset could be None + "Error uploading chunk_size %d at begin_offset %s: %s: %s", + chunk_size, + begin_offset, + ex.__class__.__name__, + str(ex), + ) + # Keep things immutable here. Will increment retries in the caller + retries += 1 + if _is_immediate_retry(ex): + sleep_for = 0 + else: + sleep_for = min(2**retries, 16) + LOG.info( + "Retrying in %d seconds (%d/%d)", + sleep_for, + retries, + constants.MAX_UPLOAD_RETRIES, + ) + if sleep_for: + time.sleep(sleep_for) + else: + raise ex + + def _finish_upload_retryable( + self, upload_service: upload_api_v4.UploadService, file_handle: str + ): + if self.dry_run: + cluster_id = "0" + else: + resp = api_v4.finish_upload( + self.user_items["user_upload_token"], + file_handle, + upload_service.cluster_filetype, + organization_id=self.user_items.get("MAPOrganizationKey"), + ) + + data = resp.json() + cluster_id = data.get("cluster_id") + + # TODO: validate cluster_id + + return cluster_id + def upload_stream( self, fp: T.IO[bytes], cluster_filetype: upload_api_v4.ClusterFileType, upload_md5sum: str, - event_payload: Progress | None = None, + progress: Progress | None = None, ) -> str | None: - if event_payload is None: - event_payload = {} + if progress is None: + progress = {} fp.seek(0, io.SEEK_END) entity_size = fp.tell() @@ -289,40 +364,79 @@ def upload_stream( } session_key = f"mly_tools_{upload_md5sum}{SUFFIX_MAP[cluster_filetype]}" - if self.dry_run: - upload_service: upload_api_v4.UploadService = ( - upload_api_v4.FakeUploadService( - user_access_token=self.user_items["user_upload_token"], - session_key=session_key, - organization_id=self.user_items.get("MAPOrganizationKey"), - cluster_filetype=cluster_filetype, - chunk_size=self.chunk_size, + upload_service = self._create_upload_service(session_key, cluster_filetype) + + progress["entity_size"] = entity_size + progress["md5sum"] = upload_md5sum + progress["chunk_size"] = self.chunk_size + progress["retries"] = 0 + + if self.emitter: + try: + self.emitter.emit("upload_start", progress) + except UploadCancelled: + # throw in upload_start only + return None + + while True: + try: + file_handle = self._upload_stream_retryable( + upload_service, fp, progress ) - ) - else: - upload_service = upload_api_v4.UploadService( - user_access_token=self.user_items["user_upload_token"], - session_key=session_key, - organization_id=self.user_items.get("MAPOrganizationKey"), - cluster_filetype=cluster_filetype, - chunk_size=self.chunk_size, - ) + except Exception as ex: + self._handle_upload_exception(ex, progress) + else: + break - final_event_payload: Progress = { - **event_payload, # type: ignore - "entity_size": entity_size, - "md5sum": upload_md5sum, - } + progress["retries"] += 1 - try: - return _upload_stream_with_retries( - upload_service, - fp, - event_payload=final_event_payload, - emitter=self.emitter, - ) - except UploadCancelled: - return None + if self.emitter: + self.emitter.emit("upload_end", progress) + + # TODO: retry here + cluster_id = self._finish_upload_retryable(upload_service, file_handle) + + if self.emitter: + self.emitter.emit("upload_finished", progress) + + return cluster_id + + def _chunkize_byte_stream( + self, + stream: T.IO[bytes], + progress: Progress, + ) -> T.Generator[bytes, None, None]: + while True: + data = stream.read(self.chunk_size) + if not data: + break + yield data + progress["offset"] += len(data) + progress["chunk_size"] = len(data) + if self.emitter: + self.emitter.emit("upload_progress", progress) + + def _upload_stream_retryable( + self, + upload_service: upload_api_v4.UploadService, + fp: T.IO[bytes], + progress: Progress, + ) -> str: + begin_offset = upload_service.fetch_offset() + + progress["begin_offset"] = begin_offset + progress["offset"] = begin_offset + + if self.emitter: + self.emitter.emit("upload_fetch_offset", progress) + + fp.seek(begin_offset, io.SEEK_SET) + + shifted_chunks = self._chunkize_byte_stream(fp, progress) + + file_handle = upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) + + return file_handle def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]): @@ -384,91 +498,3 @@ def _is_retriable_exception(ex: Exception): return True return False - - -def _setup_callback(emitter: EventEmitter, mutable_payload: Progress): - def _callback(chunk: bytes, _): - assert isinstance(emitter, EventEmitter) - mutable_payload["offset"] += len(chunk) - mutable_payload["chunk_size"] = len(chunk) - emitter.emit("upload_progress", mutable_payload) - - return _callback - - -def _upload_stream_with_retries( - upload_service: upload_api_v4.UploadService, - fp: T.IO[bytes], - event_payload: Progress | None = None, - emitter: EventEmitter | None = None, -) -> str: - retries = 0 - - if event_payload is None: - event_payload = {} - - mutable_payload = T.cast(Progress, {**event_payload}) - - # when it progresses, we reset retries - def _reset_retries(_, __): - nonlocal retries - retries = 0 - - if emitter: - emitter.emit("upload_start", mutable_payload) - - while True: - fp.seek(0, io.SEEK_SET) - begin_offset: int | None = None - try: - begin_offset = upload_service.fetch_offset() - upload_service.callbacks = [_reset_retries] - if emitter: - mutable_payload["offset"] = begin_offset - mutable_payload["retries"] = retries - emitter.emit("upload_fetch_offset", mutable_payload) - upload_service.callbacks.append( - _setup_callback(emitter, mutable_payload) - ) - file_handle = upload_service.upload(fp, offset=begin_offset) - except Exception as ex: - if retries < constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - if emitter: - emitter.emit("upload_interrupted", mutable_payload) - LOG.warning( - # use %s instead of %d because offset could be None - "Error uploading chunk_size %d at begin_offset %s: %s: %s", - upload_service.chunk_size, - begin_offset, - ex.__class__.__name__, - str(ex), - ) - retries += 1 - if _is_immediate_retry(ex): - sleep_for = 0 - else: - sleep_for = min(2**retries, 16) - LOG.info( - "Retrying in %d seconds (%d/%d)", - sleep_for, - retries, - constants.MAX_UPLOAD_RETRIES, - ) - if sleep_for: - time.sleep(sleep_for) - else: - raise ex - else: - break - - if emitter: - emitter.emit("upload_end", mutable_payload) - - # TODO: retry here - cluster_id = upload_service.finish(file_handle) - - if emitter: - mutable_payload["cluster_id"] = cluster_id - emitter.emit("upload_finished", mutable_payload) - - return cluster_id diff --git a/tests/cli/upload_api_v4.py b/tests/cli/upload_api_v4.py index a14846413..24d33c88f 100644 --- a/tests/cli/upload_api_v4.py +++ b/tests/cli/upload_api_v4.py @@ -8,7 +8,7 @@ import tqdm from mapillary_tools import api_v4, authenticate -from mapillary_tools.upload_api_v4 import UploadService, FakeUploadService +from mapillary_tools.upload_api_v4 import FakeUploadService, UploadService LOG = logging.getLogger("mapillary_tools") From 65b2f70c616e05af9be604e6b90e66542f23b351 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 12:37:33 -0700 Subject: [PATCH 05/15] fix types --- mapillary_tools/api_v4.py | 16 +++++++++------- mapillary_tools/upload_api_v4.py | 10 ++++++++-- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index 6bdc041c4..9feee7d55 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import enum import logging import os @@ -100,9 +102,9 @@ def _sanitize(headers: T.Mapping[T.Any, T.Any]) -> T.Mapping[T.Any, T.Any]: def _log_debug_request( method: str, url: str, - json: T.Optional[T.Dict] = None, - params: T.Optional[T.Dict] = None, - headers: T.Optional[T.Dict] = None, + json: dict | None = None, + params: dict | None = None, + headers: dict | None = None, timeout: T.Any = None, ): if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: @@ -157,8 +159,8 @@ def readable_http_error(ex: requests.HTTPError) -> str: def request_post( url: str, - data: T.Optional[T.Any] = None, - json: T.Optional[dict] = None, + data: T.Any | None = None, + json: dict | None = None, **kwargs, ) -> requests.Response: global USE_SYSTEM_CERTS @@ -197,7 +199,7 @@ def request_post( def request_get( url: str, - params: T.Optional[dict] = None, + params: dict | None = None, **kwargs, ) -> requests.Response: global USE_SYSTEM_CERTS @@ -300,7 +302,7 @@ def fetch_organization( def fetch_user_or_me( user_access_token: str, - user_id: T.Optional[T.Union[int, str]] = None, + user_id: int | str | None = None, ) -> requests.Response: if user_id is None: url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/me" diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index b9088ca8b..b0a7cc8c9 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -3,9 +3,15 @@ import io import os import random +import sys import typing as T import uuid +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + import requests from .api_v4 import ClusterFileType, request_get, request_post, REQUESTS_TIMEOUT @@ -149,7 +155,7 @@ def __init__(self, *args, **kwargs): ) self._error_ratio = 0.02 - @T.override + @override def upload_shifted_chunks( self, shifted_chunks: T.Iterable[bytes], @@ -176,7 +182,7 @@ def upload_shifted_chunks( ) return uuid.uuid4().hex - @T.override + @override def fetch_offset(self) -> int: if random.random() <= self._error_ratio: raise requests.ConnectionError( From aff015ca26f9127020e5c8db57048c325e5d063b Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 12:40:04 -0700 Subject: [PATCH 06/15] update --- mapillary_tools/upload.py | 9 +- mapillary_tools/uploader.py | 217 ++++++++++++++++++------------------ 2 files changed, 115 insertions(+), 111 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 33b03a016..2ba49a94a 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -463,9 +463,9 @@ def _upload_everything( ) if specified_image_metadatas: try: - clusters = mly_uploader.upload_images( + clusters = uploader.ZipImageSequence.prepare_images_and_upload( specified_image_metadatas, - event_payload={"file_type": FileType.IMAGE.value}, + mly_uploader, ) except Exception as ex: raise UploadError(ex) from ex @@ -635,12 +635,11 @@ def _upload_zipfiles( event_payload: uploader.Progress = { "total_sequence_count": len(zip_paths), "sequence_idx": idx, - "file_type": FileType.ZIP.value, "import_path": str(zip_path), } try: - cluster_id = mly_uploader.upload_zipfile( - zip_path, event_payload=event_payload + cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload( + zip_path, mly_uploader, event_payload=event_payload ) except Exception as ex: raise UploadError(ex) from ex diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 66fdf5963..96234500c 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -36,6 +36,12 @@ class Progress(T.TypedDict, total=False): # Size in bytes of the zipfile/BlackVue/CAMM entity_size: int + # An "upload_interrupted" will increase it. Reset to 0 if the chunk is uploaded + retries: int + + # md5sum of the zipfile/BlackVue/CAMM in uploading + md5sum: str + # How many sequences in total. It's always 1 when uploading Zipfile/BlackVue/CAMM total_sequence_count: int @@ -48,12 +54,6 @@ class Progress(T.TypedDict, total=False): # MAPSequenceUUID. It is only available for directory uploading sequence_uuid: str - # An "upload_interrupted" will increase it. Reset to 0 if the chunk is uploaded - retries: int - - # md5sum of the zipfile/BlackVue/CAMM in uploading - md5sum: str - # Path to the Zipfile/BlackVue/CAMM import_path: str @@ -191,24 +191,11 @@ def _write_imagebytes_in_zip( zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) zipf.writestr(zipinfo, image_bytes) - -class Uploader: - def __init__( - self, - user_items: types.UserItem, - emitter: EventEmitter | None = None, - chunk_size: int = 2 * 1024 * 1024, # 2MB - dry_run=False, - ): - jsonschema.validate(instance=user_items, schema=types.UserItemSchema) - self.user_items = user_items - self.emitter = emitter - self.chunk_size = chunk_size - self.dry_run = dry_run - - def upload_zipfile( - self, + @classmethod + def prepare_zipfile_and_upload( + cls, zip_path: Path, + uploader: Uploader, event_payload: Progress | None = None, ) -> str | None: if event_payload is None: @@ -223,26 +210,29 @@ def upload_zipfile( final_event_payload: Progress = { # **event_payload, # type: ignore "sequence_image_count": len(namelist), + "file_type": types.FileType.ZIP.value, } with zip_path.open("rb") as zip_fp: - upload_md5sum = ZipImageSequence.extract_upload_md5sum(zip_fp) + upload_md5sum = cls.extract_upload_md5sum(zip_fp) if upload_md5sum is None: with zip_path.open("rb") as zip_fp: upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() with zip_path.open("rb") as zip_fp: - return self.upload_stream( + return uploader.upload_stream( zip_fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, progress=final_event_payload, ) - def upload_images( - self, + @classmethod + def prepare_images_and_upload( + cls, image_metadatas: T.Sequence[types.ImageMetadata], + uploader: Uploader, event_payload: Progress | None = None, ) -> dict[str, str]: if event_payload is None: @@ -258,10 +248,11 @@ def upload_images( "total_sequence_count": len(sequences), "sequence_image_count": len(sequence), "sequence_uuid": sequence_uuid, + "file_type": types.FileType.IMAGE.value, } with tempfile.NamedTemporaryFile() as fp: - upload_md5sum = ZipImageSequence.zip_sequence_fp(sequence, fp) - cluster_id = self.upload_stream( + upload_md5sum = cls.zip_sequence_fp(sequence, fp) + cluster_id = uploader.upload_stream( fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, @@ -271,78 +262,20 @@ def upload_images( ret[sequence_uuid] = cluster_id return ret - def _create_upload_service( - self, session_key: str, cluster_filetype: upload_api_v4.ClusterFileType - ) -> upload_api_v4.UploadService: - upload_service: upload_api_v4.UploadService - if self.dry_run: - upload_service = upload_api_v4.FakeUploadService( - user_access_token=self.user_items["user_upload_token"], - session_key=session_key, - cluster_filetype=cluster_filetype, - ) - else: - upload_service = upload_api_v4.UploadService( - user_access_token=self.user_items["user_upload_token"], - session_key=session_key, - cluster_filetype=cluster_filetype, - ) - - return upload_service - - def _handle_upload_exception(self, ex: Exception, progress: Progress) -> None: - retries = progress["retries"] - begin_offset = progress.get("begin_offset") - chunk_size = progress["chunk_size"] - - if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - if self.emitter: - self.emitter.emit("upload_interrupted", progress) - LOG.warning( - # use %s instead of %d because offset could be None - "Error uploading chunk_size %d at begin_offset %s: %s: %s", - chunk_size, - begin_offset, - ex.__class__.__name__, - str(ex), - ) - # Keep things immutable here. Will increment retries in the caller - retries += 1 - if _is_immediate_retry(ex): - sleep_for = 0 - else: - sleep_for = min(2**retries, 16) - LOG.info( - "Retrying in %d seconds (%d/%d)", - sleep_for, - retries, - constants.MAX_UPLOAD_RETRIES, - ) - if sleep_for: - time.sleep(sleep_for) - else: - raise ex - - def _finish_upload_retryable( - self, upload_service: upload_api_v4.UploadService, file_handle: str +class Uploader: + def __init__( + self, + user_items: types.UserItem, + emitter: EventEmitter | None = None, + chunk_size: int = 2 * 1024 * 1024, # 2MB + dry_run=False, ): - if self.dry_run: - cluster_id = "0" - else: - resp = api_v4.finish_upload( - self.user_items["user_upload_token"], - file_handle, - upload_service.cluster_filetype, - organization_id=self.user_items.get("MAPOrganizationKey"), - ) - - data = resp.json() - cluster_id = data.get("cluster_id") - - # TODO: validate cluster_id - - return cluster_id + jsonschema.validate(instance=user_items, schema=types.UserItemSchema) + self.user_items = user_items + self.emitter = emitter + self.chunk_size = chunk_size + self.dry_run = dry_run def upload_stream( self, @@ -401,18 +334,70 @@ def upload_stream( return cluster_id + def _create_upload_service( + self, session_key: str, cluster_filetype: upload_api_v4.ClusterFileType + ) -> upload_api_v4.UploadService: + upload_service: upload_api_v4.UploadService + + if self.dry_run: + upload_service = upload_api_v4.FakeUploadService( + user_access_token=self.user_items["user_upload_token"], + session_key=session_key, + cluster_filetype=cluster_filetype, + ) + else: + upload_service = upload_api_v4.UploadService( + user_access_token=self.user_items["user_upload_token"], + session_key=session_key, + cluster_filetype=cluster_filetype, + ) + + return upload_service + + def _handle_upload_exception(self, ex: Exception, progress: Progress) -> None: + retries = progress["retries"] + begin_offset = progress.get("begin_offset") + chunk_size = progress["chunk_size"] + + if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): + if self.emitter: + self.emitter.emit("upload_interrupted", progress) + LOG.warning( + # use %s instead of %d because offset could be None + "Error uploading chunk_size %d at begin_offset %s: %s: %s", + chunk_size, + begin_offset, + ex.__class__.__name__, + str(ex), + ) + # Keep things immutable here. Will increment retries in the caller + retries += 1 + if _is_immediate_retry(ex): + sleep_for = 0 + else: + sleep_for = min(2**retries, 16) + LOG.info( + "Retrying in %d seconds (%d/%d)", + sleep_for, + retries, + constants.MAX_UPLOAD_RETRIES, + ) + if sleep_for: + time.sleep(sleep_for) + else: + raise ex + def _chunkize_byte_stream( self, stream: T.IO[bytes], progress: Progress, ) -> T.Generator[bytes, None, None]: - while True: - data = stream.read(self.chunk_size) - if not data: - break - yield data - progress["offset"] += len(data) - progress["chunk_size"] = len(data) + for chunk in upload_api_v4.UploadService.chunkize_byte_stream( + stream, self.chunk_size + ): + yield chunk + progress["offset"] += len(chunk) + progress["chunk_size"] = len(chunk) if self.emitter: self.emitter.emit("upload_progress", progress) @@ -438,6 +423,26 @@ def _upload_stream_retryable( return file_handle + def _finish_upload_retryable( + self, upload_service: upload_api_v4.UploadService, file_handle: str + ): + if self.dry_run: + cluster_id = "0" + else: + resp = api_v4.finish_upload( + self.user_items["user_upload_token"], + file_handle, + upload_service.cluster_filetype, + organization_id=self.user_items.get("MAPOrganizationKey"), + ) + + data = resp.json() + cluster_id = data.get("cluster_id") + + # TODO: validate cluster_id + + return cluster_id + def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]): for metadata in metadatas: From a24f43c5538cc67e540e0fb15aa6c6910d13a827 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 14:04:51 -0700 Subject: [PATCH 07/15] fix types for progress --- mapillary_tools/upload.py | 8 ++--- mapillary_tools/uploader.py | 71 ++++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 2ba49a94a..c154319e4 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -505,7 +505,7 @@ def _upload_everything( with video_metadata.filename.open("rb") as src_fp: camm_fp = simple_mp4_builder.transform_mp4(src_fp, generator) - event_payload: uploader.Progress = { + progress: uploader.FileProgress = { "total_sequence_count": len(specified_video_metadatas), "sequence_idx": idx, "file_type": video_metadata.filetype.value, @@ -516,7 +516,7 @@ def _upload_everything( T.cast(T.BinaryIO, camm_fp), upload_api_v4.ClusterFileType.CAMM, video_metadata.md5sum, - progress=event_payload, + progress=T.cast(dict[str, T.Any], progress), ) except Exception as ex: raise UploadError(ex) from ex @@ -632,14 +632,14 @@ def _upload_zipfiles( zip_paths: T.Sequence[Path], ) -> None: for idx, zip_path in enumerate(zip_paths): - event_payload: uploader.Progress = { + progress: uploader.FileProgress = { "total_sequence_count": len(zip_paths), "sequence_idx": idx, "import_path": str(zip_path), } try: cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload( - zip_path, mly_uploader, event_payload=event_payload + zip_path, mly_uploader, progress=progress ) except Exception as ex: raise UploadError(ex) from ex diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 96234500c..7881d87ff 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -21,14 +21,15 @@ LOG = logging.getLogger(__name__) -class Progress(T.TypedDict, total=False): - # The size of the chunk, in bytes, that has been uploaded in the last request - chunk_size: int +class UploaderProgress(T.TypedDict, total=True): + """ + Progress data that Uploader cares about. + """ - # File type - file_type: str + # The size of the chunk, in bytes, that has been read and upload + chunk_size: int - begin_offset: int + begin_offset: int | None # How many bytes has been uploaded so far since "upload_start" offset: int @@ -42,6 +43,14 @@ class Progress(T.TypedDict, total=False): # md5sum of the zipfile/BlackVue/CAMM in uploading md5sum: str + # Cluster ID after finishing the upload + cluster_id: str + + +class FileProgress(T.TypedDict, total=False): + # File type + file_type: str + # How many sequences in total. It's always 1 when uploading Zipfile/BlackVue/CAMM total_sequence_count: int @@ -57,8 +66,9 @@ class Progress(T.TypedDict, total=False): # Path to the Zipfile/BlackVue/CAMM import_path: str - # Cluster ID after finishing the upload - cluster_id: str + +class Progress(FileProgress, UploaderProgress): + pass class UploadCancelled(Exception): @@ -196,10 +206,10 @@ def prepare_zipfile_and_upload( cls, zip_path: Path, uploader: Uploader, - event_payload: Progress | None = None, + progress: FileProgress | None = None, ) -> str | None: - if event_payload is None: - event_payload = {} + if progress is None: + progress = T.cast(FileProgress, {}) with zipfile.ZipFile(zip_path) as ziph: namelist = ziph.namelist() @@ -207,8 +217,8 @@ def prepare_zipfile_and_upload( LOG.warning("Skipping empty zipfile: %s", zip_path) return None - final_event_payload: Progress = { - # **event_payload, # type: ignore + final_progress: FileProgress = { + **progress, "sequence_image_count": len(namelist), "file_type": types.FileType.ZIP.value, } @@ -225,7 +235,7 @@ def prepare_zipfile_and_upload( zip_fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, - progress=final_event_payload, + progress=T.cast(dict[str, T.Any], final_progress), ) @classmethod @@ -233,17 +243,17 @@ def prepare_images_and_upload( cls, image_metadatas: T.Sequence[types.ImageMetadata], uploader: Uploader, - event_payload: Progress | None = None, + progress: FileProgress | None = None, ) -> dict[str, str]: - if event_payload is None: - event_payload = {} + if progress is None: + progress = T.cast(FileProgress, {}) _validate_metadatas(image_metadatas) sequences = types.group_and_sort_images(image_metadatas) ret: dict[str, str] = {} for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): - final_event_payload: Progress = { - **event_payload, # type: ignore + final_progress: FileProgress = { + **progress, "sequence_idx": sequence_idx, "total_sequence_count": len(sequences), "sequence_image_count": len(sequence), @@ -256,7 +266,7 @@ def prepare_images_and_upload( fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, - final_event_payload, + progress=T.cast(dict[str, T.Any], final_progress), ) if cluster_id is not None: ret[sequence_uuid] = cluster_id @@ -282,7 +292,7 @@ def upload_stream( fp: T.IO[bytes], cluster_filetype: upload_api_v4.ClusterFileType, upload_md5sum: str, - progress: Progress | None = None, + progress: dict[str, T.Any] | None = None, ) -> str | None: if progress is None: progress = {} @@ -303,6 +313,7 @@ def upload_stream( progress["md5sum"] = upload_md5sum progress["chunk_size"] = self.chunk_size progress["retries"] = 0 + progress["begin_offset"] = None if self.emitter: try: @@ -314,10 +325,10 @@ def upload_stream( while True: try: file_handle = self._upload_stream_retryable( - upload_service, fp, progress + upload_service, fp, T.cast(UploaderProgress, progress) ) except Exception as ex: - self._handle_upload_exception(ex, progress) + self._handle_upload_exception(ex, T.cast(UploaderProgress, progress)) else: break @@ -354,7 +365,9 @@ def _create_upload_service( return upload_service - def _handle_upload_exception(self, ex: Exception, progress: Progress) -> None: + def _handle_upload_exception( + self, ex: Exception, progress: UploaderProgress + ) -> None: retries = progress["retries"] begin_offset = progress.get("begin_offset") chunk_size = progress["chunk_size"] @@ -387,10 +400,10 @@ def _handle_upload_exception(self, ex: Exception, progress: Progress) -> None: else: raise ex - def _chunkize_byte_stream( + def _chunk_with_progress_emitted( self, stream: T.IO[bytes], - progress: Progress, + progress: UploaderProgress, ) -> T.Generator[bytes, None, None]: for chunk in upload_api_v4.UploadService.chunkize_byte_stream( stream, self.chunk_size @@ -405,7 +418,7 @@ def _upload_stream_retryable( self, upload_service: upload_api_v4.UploadService, fp: T.IO[bytes], - progress: Progress, + progress: UploaderProgress, ) -> str: begin_offset = upload_service.fetch_offset() @@ -417,7 +430,7 @@ def _upload_stream_retryable( fp.seek(begin_offset, io.SEEK_SET) - shifted_chunks = self._chunkize_byte_stream(fp, progress) + shifted_chunks = self._chunk_with_progress_emitted(fp, progress) file_handle = upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) @@ -425,7 +438,7 @@ def _upload_stream_retryable( def _finish_upload_retryable( self, upload_service: upload_api_v4.UploadService, file_handle: str - ): + ) -> str: if self.dry_run: cluster_id = "0" else: From 20ce83d4d40cb4a4326c78ff8d96bf86f8973d6f Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 14:11:22 -0700 Subject: [PATCH 08/15] update types --- mapillary_tools/upload.py | 46 ++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index c154319e4..b0e280b4d 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import json import logging import os @@ -27,7 +29,7 @@ from .mp4 import simple_mp4_builder from .types import FileType -JSONDict = T.Dict[str, T.Union[str, int, float, None]] +JSONDict = dict[str, str | int | float | None] LOG = logging.getLogger(__name__) MAPILLARY_DISABLE_API_LOGGING = os.getenv("MAPILLARY_DISABLE_API_LOGGING") @@ -45,8 +47,8 @@ def __init__(self, inner_ex) -> None: def _load_validate_metadatas_from_desc_path( - desc_path: T.Optional[str], import_paths: T.Sequence[Path] -) -> T.List[types.Metadata]: + desc_path: str | None, import_paths: T.Sequence[Path] +) -> list[types.Metadata]: is_default_desc_path = False if desc_path is None: is_default_desc_path = True @@ -64,7 +66,7 @@ def _load_validate_metadatas_from_desc_path( "The description path must be specified (with --desc_path) when uploading a single file", ) - descs: T.List[types.DescriptionOrError] = [] + descs: list[types.DescriptionOrError] = [] if desc_path == "-": try: @@ -117,7 +119,7 @@ def _load_validate_metadatas_from_desc_path( def zip_images( import_path: Path, zip_dir: Path, - desc_path: T.Optional[str] = None, + desc_path: str | None = None, ): if not import_path.is_dir(): raise exceptions.MapillaryFileNotFoundError( @@ -162,7 +164,7 @@ def upload_start(payload: uploader.Progress): def _setup_write_upload_history( emitter: uploader.EventEmitter, params: JSONDict, - metadatas: T.Optional[T.List[types.Metadata]] = None, + metadatas: list[types.Metadata] | None = None, ) -> None: @emitter.on("upload_finished") def upload_finished(payload: uploader.Progress): @@ -190,7 +192,7 @@ def upload_finished(payload: uploader.Progress): def _setup_tdqm(emitter: uploader.EventEmitter) -> None: - upload_pbar: T.Optional[tqdm] = None + upload_pbar: tqdm | None = None @emitter.on("upload_fetch_offset") def upload_fetch_offset(payload: uploader.Progress) -> None: @@ -201,7 +203,7 @@ def upload_fetch_offset(payload: uploader.Progress) -> None: nth = payload["sequence_idx"] + 1 total = payload["total_sequence_count"] - import_path: T.Optional[str] = payload.get("import_path") + import_path: str | None = payload.get("import_path") filetype = payload.get("file_type", "unknown").upper() if import_path is None: _desc = f"Uploading {filetype} ({nth}/{total})" @@ -276,7 +278,7 @@ class _APIStats(uploader.Progress, total=False): def _setup_api_stats(emitter: uploader.EventEmitter): - all_stats: T.List[_APIStats] = [] + all_stats: list[_APIStats] = [] @emitter.on("upload_start") def collect_start_time(payload: _APIStats) -> None: @@ -309,7 +311,7 @@ def collect_end_time(payload: _APIStats) -> None: return all_stats -def _summarize(stats: T.Sequence[_APIStats]) -> T.Dict: +def _summarize(stats: T.Sequence[_APIStats]) -> dict: total_image_count = sum(s.get("sequence_image_count", 0) for s in stats) total_uploaded_sequence_count = len(stats) # note that stats[0]["total_sequence_count"] not always same as total_uploaded_sequence_count @@ -341,7 +343,7 @@ def _summarize(stats: T.Sequence[_APIStats]) -> T.Dict: def _show_upload_summary(stats: T.Sequence[_APIStats]): - grouped: T.Dict[str, T.List[_APIStats]] = {} + grouped: dict[str, list[_APIStats]] = {} for stat in stats: grouped.setdefault(stat.get("file_type", "unknown"), []).append(stat) @@ -365,7 +367,7 @@ def _show_upload_summary(stats: T.Sequence[_APIStats]): LOG.info("%8.1fs upload time", summary["time"]) -def _api_logging_finished(summary: T.Dict): +def _api_logging_finished(summary: dict): if MAPILLARY_DISABLE_API_LOGGING: return @@ -383,7 +385,7 @@ def _api_logging_finished(summary: T.Dict): LOG.warning("Error from API Logging for action %s", action, exc_info=True) -def _api_logging_failed(payload: T.Dict, exc: Exception): +def _api_logging_failed(payload: dict, exc: Exception): if MAPILLARY_DISABLE_API_LOGGING: return @@ -403,11 +405,11 @@ def _api_logging_failed(payload: T.Dict, exc: Exception): def _load_descs( - _metadatas_from_process: T.Optional[T.Sequence[types.MetadataOrError]], - desc_path: T.Optional[str], + _metadatas_from_process: T.Sequence[types.MetadataOrError] | None, + desc_path: str | None, import_paths: T.Sequence[Path], -) -> T.List[types.Metadata]: - metadatas: T.List[types.Metadata] +) -> list[types.Metadata]: + metadatas: list[types.Metadata] if _metadatas_from_process is not None: metadatas = [ @@ -439,7 +441,7 @@ def _load_descs( def _find_metadata_with_filename_existed_in( metadatas: T.Sequence[_M], paths: T.Sequence[Path] -) -> T.List[_M]: +) -> list[_M]: resolved_image_paths = set(p.resolve() for p in paths) return [d for d in metadatas if d.filename.resolve() in resolved_image_paths] @@ -488,7 +490,7 @@ def _upload_everything( assert isinstance(video_metadata.md5sum, str), "md5sum should be updated" # extract telemetry measurements from GoPro videos - telemetry_measurements: T.List[camm_parser.TelemetryMeasurement] = [] + telemetry_measurements: list[camm_parser.TelemetryMeasurement] = [] if MAPILLARY__EXPERIMENTAL_ENABLE_IMU == "YES": if video_metadata.filetype is FileType.GOPRO: with video_metadata.filename.open("rb") as fp: @@ -528,10 +530,10 @@ def _upload_everything( def upload( - import_path: T.Union[Path, T.Sequence[Path]], + import_path: Path | T.Sequence[Path], user_items: types.UserItem, - desc_path: T.Optional[str] = None, - _metadatas_from_process: T.Optional[T.Sequence[types.MetadataOrError]] = None, + desc_path: str | None = None, + _metadatas_from_process: T.Sequence[types.MetadataOrError] | None = None, dry_run=False, skip_subfolders=False, ) -> None: From ffa163945caac26773ae9f13b22ea174dd9a265f Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 14:15:20 -0700 Subject: [PATCH 09/15] types --- mapillary_tools/upload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index b0e280b4d..a683f27fe 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -29,7 +29,7 @@ from .mp4 import simple_mp4_builder from .types import FileType -JSONDict = dict[str, str | int | float | None] +JSONDict = dict[str, T.Union[str, int, float, None]] LOG = logging.getLogger(__name__) MAPILLARY_DISABLE_API_LOGGING = os.getenv("MAPILLARY_DISABLE_API_LOGGING") From 6635f6803cd96620d1d6a222b1fc6d3626f9117c Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 14:22:34 -0700 Subject: [PATCH 10/15] types --- mapillary_tools/upload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index a683f27fe..e636fdd0a 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -29,7 +29,7 @@ from .mp4 import simple_mp4_builder from .types import FileType -JSONDict = dict[str, T.Union[str, int, float, None]] +JSONDict = T.Dict[str, T.Union[str, int, float, None]] LOG = logging.getLogger(__name__) MAPILLARY_DISABLE_API_LOGGING = os.getenv("MAPILLARY_DISABLE_API_LOGGING") From 43c5abaa27d3d7af06ec115f4fafa0254be92809 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 14:30:11 -0700 Subject: [PATCH 11/15] add cluster_id to the progress payload --- mapillary_tools/upload.py | 4 ++-- mapillary_tools/uploader.py | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index e636fdd0a..67c8c78ef 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -507,7 +507,7 @@ def _upload_everything( with video_metadata.filename.open("rb") as src_fp: camm_fp = simple_mp4_builder.transform_mp4(src_fp, generator) - progress: uploader.FileProgress = { + progress: uploader.SequenceProgress = { "total_sequence_count": len(specified_video_metadatas), "sequence_idx": idx, "file_type": video_metadata.filetype.value, @@ -634,7 +634,7 @@ def _upload_zipfiles( zip_paths: T.Sequence[Path], ) -> None: for idx, zip_path in enumerate(zip_paths): - progress: uploader.FileProgress = { + progress: uploader.SequenceProgress = { "total_sequence_count": len(zip_paths), "sequence_idx": idx, "import_path": str(zip_path), diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 7881d87ff..b196310d5 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -47,7 +47,9 @@ class UploaderProgress(T.TypedDict, total=True): cluster_id: str -class FileProgress(T.TypedDict, total=False): +class SequenceProgress(T.TypedDict, total=False): + """Progress data at sequence level""" + # File type file_type: str @@ -67,7 +69,7 @@ class FileProgress(T.TypedDict, total=False): import_path: str -class Progress(FileProgress, UploaderProgress): +class Progress(SequenceProgress, UploaderProgress): pass @@ -206,10 +208,10 @@ def prepare_zipfile_and_upload( cls, zip_path: Path, uploader: Uploader, - progress: FileProgress | None = None, + progress: SequenceProgress | None = None, ) -> str | None: if progress is None: - progress = T.cast(FileProgress, {}) + progress = T.cast(SequenceProgress, {}) with zipfile.ZipFile(zip_path) as ziph: namelist = ziph.namelist() @@ -217,7 +219,7 @@ def prepare_zipfile_and_upload( LOG.warning("Skipping empty zipfile: %s", zip_path) return None - final_progress: FileProgress = { + final_progress: SequenceProgress = { **progress, "sequence_image_count": len(namelist), "file_type": types.FileType.ZIP.value, @@ -243,16 +245,16 @@ def prepare_images_and_upload( cls, image_metadatas: T.Sequence[types.ImageMetadata], uploader: Uploader, - progress: FileProgress | None = None, + progress: SequenceProgress | None = None, ) -> dict[str, str]: if progress is None: - progress = T.cast(FileProgress, {}) + progress = T.cast(SequenceProgress, {}) _validate_metadatas(image_metadatas) sequences = types.group_and_sort_images(image_metadatas) ret: dict[str, str] = {} for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): - final_progress: FileProgress = { + final_progress: SequenceProgress = { **progress, "sequence_idx": sequence_idx, "total_sequence_count": len(sequences), @@ -339,6 +341,7 @@ def upload_stream( # TODO: retry here cluster_id = self._finish_upload_retryable(upload_service, file_handle) + progress["cluster_id"] = cluster_id if self.emitter: self.emitter.emit("upload_finished", progress) From 4c6c09f7964a0be43b0b47efd77937106749be2c Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 14:32:44 -0700 Subject: [PATCH 12/15] fix tests --- tests/unit/test_uploader.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index da1aed0e8..5e229442f 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -64,8 +64,9 @@ def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path "filetype": "image", }, ] - resp = mly_uploader.upload_images( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs] + resp = uploader.ZipImageSequence.prepare_images_and_upload( + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], + mly_uploader, ) assert len(resp) == 1 assert len(setup_upload.listdir()) == 1 @@ -115,8 +116,9 @@ def test_upload_images_multiple_sequences( }, dry_run=True, ) - resp = mly_uploader.upload_images( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs] + resp = uploader.ZipImageSequence.prepare_images_and_upload( + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], + mly_uploader, ) assert len(resp) == 2 assert len(setup_upload.listdir()) == 2 @@ -180,8 +182,10 @@ def test_upload_zip( emitter=emitter, ) for zip_path in zip_dir.listdir(): - resp = mly_uploader.upload_zipfile(Path(zip_path)) - assert resp == "0" + cluster = uploader.ZipImageSequence.prepare_zipfile_and_upload( + Path(zip_path), mly_uploader + ) + assert cluster == "0" descs = _validate_zip_dir(setup_upload) assert 3 == len(descs) From 8db245e1a7d7d324bcd6ee5d7137030870d9b624 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 14:37:14 -0700 Subject: [PATCH 13/15] fix types --- mapillary_tools/upload.py | 2 +- mapillary_tools/uploader.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 67c8c78ef..f93c11e43 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -518,7 +518,7 @@ def _upload_everything( T.cast(T.BinaryIO, camm_fp), upload_api_v4.ClusterFileType.CAMM, video_metadata.md5sum, - progress=T.cast(dict[str, T.Any], progress), + progress=T.cast(T.Dict[str, T.Any], progress), ) except Exception as ex: raise UploadError(ex) from ex diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index b196310d5..da815f4eb 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -268,7 +268,7 @@ def prepare_images_and_upload( fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, - progress=T.cast(dict[str, T.Any], final_progress), + progress=T.cast(T.Dict[str, T.Any], final_progress), ) if cluster_id is not None: ret[sequence_uuid] = cluster_id From 4f5880bc160286860163dedf7c7fff947dc9bea3 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 14:38:06 -0700 Subject: [PATCH 14/15] types --- mapillary_tools/uploader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index da815f4eb..956900409 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -237,7 +237,7 @@ def prepare_zipfile_and_upload( zip_fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, - progress=T.cast(dict[str, T.Any], final_progress), + progress=T.cast(T.Dict[str, T.Any], final_progress), ) @classmethod From 66c99a1ae268c626015d8bf5b426950ae4bdcf23 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Thu, 27 Mar 2025 15:07:05 -0700 Subject: [PATCH 15/15] pass session key to uploader instead of md5sum --- mapillary_tools/upload.py | 8 ++++- mapillary_tools/uploader.py | 71 +++++++++++++++++++++++-------------- tests/unit/test_uploader.py | 4 +-- 3 files changed, 54 insertions(+), 29 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index f93c11e43..8754efc27 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -512,12 +512,18 @@ def _upload_everything( "sequence_idx": idx, "file_type": video_metadata.filetype.value, "import_path": str(video_metadata.filename), + "md5sum": video_metadata.md5sum, } + + session_key = uploader._session_key( + video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM + ) + try: cluster_id = mly_uploader.upload_stream( T.cast(T.BinaryIO, camm_fp), upload_api_v4.ClusterFileType.CAMM, - video_metadata.md5sum, + session_key, progress=T.cast(T.Dict[str, T.Any], progress), ) except Exception as ex: diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 956900409..98052e750 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -40,9 +40,6 @@ class UploaderProgress(T.TypedDict, total=True): # An "upload_interrupted" will increase it. Reset to 0 if the chunk is uploaded retries: int - # md5sum of the zipfile/BlackVue/CAMM in uploading - md5sum: str - # Cluster ID after finishing the upload cluster_id: str @@ -50,6 +47,9 @@ class UploaderProgress(T.TypedDict, total=True): class SequenceProgress(T.TypedDict, total=False): """Progress data at sequence level""" + # md5sum of the zipfile/BlackVue/CAMM in uploading + md5sum: str + # File type file_type: str @@ -122,11 +122,12 @@ def zip_images( f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}" ) upload_md5sum = types.update_sequence_md5sum(sequence) - zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip") + filename = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) + zip_filename = zip_dir.joinpath(filename) with wip_file_context(wip_zip_filename, zip_filename) as wip_path: with wip_path.open("wb") as wip_fp: actual_md5sum = cls.zip_sequence_fp(sequence, wip_fp) - assert actual_md5sum == upload_md5sum + assert actual_md5sum == upload_md5sum, "md5sum mismatch" @classmethod def zip_sequence_fp( @@ -219,12 +220,6 @@ def prepare_zipfile_and_upload( LOG.warning("Skipping empty zipfile: %s", zip_path) return None - final_progress: SequenceProgress = { - **progress, - "sequence_image_count": len(namelist), - "file_type": types.FileType.ZIP.value, - } - with zip_path.open("rb") as zip_fp: upload_md5sum = cls.extract_upload_md5sum(zip_fp) @@ -232,11 +227,20 @@ def prepare_zipfile_and_upload( with zip_path.open("rb") as zip_fp: upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() + final_progress: SequenceProgress = { + **progress, + "sequence_image_count": len(namelist), + "file_type": types.FileType.ZIP.value, + "md5sum": upload_md5sum, + } + + session_key = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) + with zip_path.open("rb") as zip_fp: return uploader.upload_stream( zip_fp, upload_api_v4.ClusterFileType.ZIP, - upload_md5sum, + session_key, progress=T.cast(T.Dict[str, T.Any], final_progress), ) @@ -262,12 +266,20 @@ def prepare_images_and_upload( "sequence_uuid": sequence_uuid, "file_type": types.FileType.IMAGE.value, } + with tempfile.NamedTemporaryFile() as fp: upload_md5sum = cls.zip_sequence_fp(sequence, fp) + + final_progress["md5sum"] = upload_md5sum + + session_key = _session_key( + upload_md5sum, upload_api_v4.ClusterFileType.ZIP + ) + cluster_id = uploader.upload_stream( fp, upload_api_v4.ClusterFileType.ZIP, - upload_md5sum, + session_key, progress=T.cast(T.Dict[str, T.Any], final_progress), ) if cluster_id is not None: @@ -293,7 +305,7 @@ def upload_stream( self, fp: T.IO[bytes], cluster_filetype: upload_api_v4.ClusterFileType, - upload_md5sum: str, + session_key: str, progress: dict[str, T.Any] | None = None, ) -> str | None: if progress is None: @@ -302,17 +314,9 @@ def upload_stream( fp.seek(0, io.SEEK_END) entity_size = fp.tell() - SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { - upload_api_v4.ClusterFileType.ZIP: ".zip", - upload_api_v4.ClusterFileType.CAMM: ".mp4", - upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", - } - session_key = f"mly_tools_{upload_md5sum}{SUFFIX_MAP[cluster_filetype]}" - upload_service = self._create_upload_service(session_key, cluster_filetype) progress["entity_size"] = entity_size - progress["md5sum"] = upload_md5sum progress["chunk_size"] = self.chunk_size progress["retries"] = 0 progress["begin_offset"] = None @@ -321,7 +325,7 @@ def upload_stream( try: self.emitter.emit("upload_start", progress) except UploadCancelled: - # throw in upload_start only + # TODO: Right now it is thrown in upload_start only return None while True: @@ -412,6 +416,7 @@ def _chunk_with_progress_emitted( stream, self.chunk_size ): yield chunk + progress["offset"] += len(chunk) progress["chunk_size"] = len(chunk) if self.emitter: @@ -423,6 +428,8 @@ def _upload_stream_retryable( fp: T.IO[bytes], progress: UploaderProgress, ) -> str: + """Upload the stream with safe retries guraranteed""" + begin_offset = upload_service.fetch_offset() progress["begin_offset"] = begin_offset @@ -435,13 +442,13 @@ def _upload_stream_retryable( shifted_chunks = self._chunk_with_progress_emitted(fp, progress) - file_handle = upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) - - return file_handle + return upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) def _finish_upload_retryable( self, upload_service: upload_api_v4.UploadService, file_handle: str ) -> str: + """Finish upload with safe retries guraranteed""" + if self.dry_run: cluster_id = "0" else: @@ -519,3 +526,15 @@ def _is_retriable_exception(ex: Exception): return True return False + + +def _session_key( + upload_md5sum: str, cluster_filetype: upload_api_v4.ClusterFileType +) -> str: + SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { + upload_api_v4.ClusterFileType.ZIP: ".zip", + upload_api_v4.ClusterFileType.CAMM: ".mp4", + upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", + } + + return f"mly_tools_{upload_md5sum}{SUFFIX_MAP[cluster_filetype]}" diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 5e229442f..fca750fdc 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -209,11 +209,11 @@ def test_upload_blackvue( resp = mly_uploader.upload_stream( fp, upload_api_v4.ClusterFileType.BLACKVUE, - "this_is_a_blackvue_checksum", + "this_is_a_blackvue.mp4", ) assert resp == "0" for mp4_path in setup_upload.listdir(): - assert os.path.basename(mp4_path) == "mly_tools_this_is_a_blackvue_checksum.mp4" + assert os.path.basename(mp4_path) == "this_is_a_blackvue.mp4" with open(mp4_path, "rb") as fp: assert fp.read() == b"this is a fake video"