From 0e66145a7e6b009599e9bd6bf6b4484a15f81eb6 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 10 Jun 2025 10:43:51 -0700 Subject: [PATCH 01/15] simplify upload API --- mapillary_tools/upload.py | 10 ++-- mapillary_tools/upload_api_v4.py | 13 +---- mapillary_tools/uploader.py | 84 ++++++++++++++++---------------- 3 files changed, 50 insertions(+), 57 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 3b1f8bef0..8dfb26bda 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -514,7 +514,7 @@ def _gen_upload_videos( } session_key = uploader._session_key( - video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM + video_metadata.md5sum, api_v4.ClusterFileType.CAMM ) try: @@ -525,12 +525,16 @@ def _gen_upload_videos( ) # Upload the mp4 stream - cluster_id = mly_uploader.upload_stream( + file_handle = mly_uploader.upload_stream( T.cast(T.IO[bytes], camm_fp), - upload_api_v4.ClusterFileType.CAMM, session_key, progress=T.cast(T.Dict[str, T.Any], progress), ) + cluster_id = mly_uploader.finish_upload( + file_handle, + api_v4.ClusterFileType.CAMM, + progress=T.cast(T.Dict[str, T.Any], progress), + ) except Exception as ex: yield video_metadata, uploader.UploadResult(error=ex) else: diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 3c3b5bb20..276afc64c 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -14,7 +14,7 @@ import requests -from .api_v4 import ClusterFileType, request_get, request_post, REQUESTS_TIMEOUT +from .api_v4 import request_get, request_post, REQUESTS_TIMEOUT MAPILLARY_UPLOAD_ENDPOINT = os.getenv( "MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads" @@ -31,24 +31,14 @@ class UploadService: user_access_token: str session_key: str - cluster_filetype: ClusterFileType - - MIME_BY_CLUSTER_TYPE: dict[ClusterFileType, str] = { - ClusterFileType.ZIP: "application/zip", - ClusterFileType.BLACKVUE: "video/mp4", - ClusterFileType.CAMM: "video/mp4", - } def __init__( self, user_access_token: str, session_key: str, - cluster_filetype: ClusterFileType, ): self.user_access_token = user_access_token self.session_key = session_key - # Validate the input - self.cluster_filetype = cluster_filetype def fetch_offset(self) -> int: headers = { @@ -124,7 +114,6 @@ def upload_shifted_chunks( "Authorization": f"OAuth {self.user_access_token}", "Offset": f"{offset}", "X-Entity-Name": self.session_key, - "X-Entity-Type": self.MIME_BY_CLUSTER_TYPE[self.cluster_filetype], } url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" resp = request_post( diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 13a631b0a..069436328 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -174,7 +174,7 @@ def _wip_file_context(cls, wip_path: Path): upload_md5sum = utils.md5sum_fp(fp).hexdigest() done_path = wip_path.parent.joinpath( - _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) + _session_key(upload_md5sum, api_v4.ClusterFileType.ZIP) ) try: @@ -292,19 +292,22 @@ def upload_zipfile( "upload_md5sum": upload_md5sum, } - upload_session_key = _session_key( - upload_md5sum, upload_api_v4.ClusterFileType.ZIP - ) + # Send the copy of the input progress to each upload session, to avoid modifying the original one + mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} + + upload_session_key = _session_key(upload_md5sum, api_v4.ClusterFileType.ZIP) with zip_path.open("rb") as zip_fp: - return uploader.upload_stream( - zip_fp, - upload_api_v4.ClusterFileType.ZIP, - upload_session_key, - # Send the copy of the input progress to each upload session, to avoid modifying the original one - progress=T.cast(T.Dict[str, T.Any], {**progress, **sequence_progress}), + file_handle = uploader.upload_stream( + zip_fp, upload_session_key, progress=mutable_progress ) + cluster_id = uploader.finish_upload( + file_handle, api_v4.ClusterFileType.ZIP, progress=mutable_progress + ) + + return cluster_id + @classmethod def zip_images_and_upload( cls, @@ -326,6 +329,8 @@ def zip_images_and_upload( "file_type": types.FileType.IMAGE.value, } + mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} + try: _validate_metadatas(sequence) except Exception as ex: @@ -345,17 +350,17 @@ def zip_images_and_upload( upload_md5sum = utils.md5sum_fp(fp).hexdigest() upload_session_key = _session_key( - upload_md5sum, upload_api_v4.ClusterFileType.ZIP + upload_md5sum, api_v4.ClusterFileType.ZIP ) try: - cluster_id = uploader.upload_stream( - fp, - upload_api_v4.ClusterFileType.ZIP, - upload_session_key, - progress=T.cast( - T.Dict[str, T.Any], {**progress, **sequence_progress} - ), + file_handle = uploader.upload_stream( + fp, upload_session_key, progress=mutable_progress + ) + cluster_id = uploader.finish_upload( + file_handle, + api_v4.ClusterFileType.ZIP, + progress=mutable_progress, ) except Exception as ex: yield sequence_uuid, UploadResult(error=ex) @@ -384,7 +389,6 @@ def __init__( def upload_stream( self, fp: T.IO[bytes], - cluster_filetype: upload_api_v4.ClusterFileType, session_key: str, progress: dict[str, T.Any] | None = None, ) -> str: @@ -394,7 +398,7 @@ def upload_stream( fp.seek(0, io.SEEK_END) entity_size = fp.tell() - upload_service = self._create_upload_service(session_key, cluster_filetype) + upload_service = self._create_upload_service(session_key) progress["entity_size"] = entity_size progress["chunk_size"] = self.chunk_size @@ -417,30 +421,20 @@ def upload_stream( self.emitter.emit("upload_end", progress) - # TODO: retry here - cluster_id = self._finish_upload_retryable(upload_service, file_handle) - progress["cluster_id"] = cluster_id - - self.emitter.emit("upload_finished", progress) - - return cluster_id + return file_handle - def _create_upload_service( - self, session_key: str, cluster_filetype: upload_api_v4.ClusterFileType - ) -> upload_api_v4.UploadService: + def _create_upload_service(self, session_key: str) -> upload_api_v4.UploadService: upload_service: upload_api_v4.UploadService if self.dry_run: upload_service = upload_api_v4.FakeUploadService( user_access_token=self.user_items["user_upload_token"], session_key=session_key, - cluster_filetype=cluster_filetype, ) else: upload_service = upload_api_v4.UploadService( user_access_token=self.user_items["user_upload_token"], session_key=session_key, - cluster_filetype=cluster_filetype, ) return upload_service @@ -517,10 +511,15 @@ def _upload_stream_retryable( return upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) - def _finish_upload_retryable( - self, upload_service: upload_api_v4.UploadService, file_handle: str + def finish_upload( + self, + file_handle: str, + cluster_filetype: api_v4.ClusterFileType, + progress: dict[str, T.Any] | None = None, ) -> str: """Finish upload with safe retries guraranteed""" + if progress is None: + progress = {} if self.dry_run: cluster_id = "0" @@ -528,7 +527,7 @@ def _finish_upload_retryable( resp = api_v4.finish_upload( self.user_items["user_upload_token"], file_handle, - upload_service.cluster_filetype, + cluster_filetype, organization_id=self.user_items.get("MAPOrganizationKey"), ) @@ -537,6 +536,9 @@ def _finish_upload_retryable( # TODO: validate cluster_id + progress["cluster_id"] = cluster_id + self.emitter.emit("upload_finished", progress) + return cluster_id @@ -580,14 +582,12 @@ def _is_retriable_exception(ex: Exception): return False -_SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { - upload_api_v4.ClusterFileType.ZIP: ".zip", - upload_api_v4.ClusterFileType.CAMM: ".mp4", - upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", +_SUFFIX_MAP: dict[api_v4.ClusterFileType, str] = { + api_v4.ClusterFileType.ZIP: ".zip", + api_v4.ClusterFileType.CAMM: ".mp4", + api_v4.ClusterFileType.BLACKVUE: ".mp4", } -def _session_key( - upload_md5sum: str, cluster_filetype: upload_api_v4.ClusterFileType -) -> str: +def _session_key(upload_md5sum: str, cluster_filetype: api_v4.ClusterFileType) -> str: return f"mly_tools_{upload_md5sum}{_SUFFIX_MAP[cluster_filetype]}" From b49edfe399aa61ced23ecf45138d577e3647fa5b Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 10 Jun 2025 13:19:03 -0700 Subject: [PATCH 02/15] fix tests --- mapillary_tools/upload.py | 1 - tests/unit/test_upload_api_v4.py | 3 --- tests/unit/test_uploader.py | 10 ++++++---- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 8dfb26bda..2c7c0a7c6 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -22,7 +22,6 @@ ipc, telemetry, types, - upload_api_v4, uploader, utils, VERSION, diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index d8d8e0ac3..3857f522f 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -11,7 +11,6 @@ def test_upload(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", - cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 content = b"double_foobar" @@ -28,7 +27,6 @@ def test_upload_big_chunksize(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", - cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 content = b"double_foobar" @@ -45,7 +43,6 @@ def test_upload_chunks(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR2.txt", - cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 4e633019d..1e53e6f68 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -8,7 +8,7 @@ import pytest -from mapillary_tools import types, upload_api_v4, uploader, utils +from mapillary_tools import types, upload_api_v4, uploader, utils, api_v4 from ..integration.fixtures import setup_upload, validate_and_extract_zip @@ -211,12 +211,14 @@ def test_upload_blackvue( with open(blackvue_path, "wb") as fp: fp.write(b"this is a fake video") with Path(blackvue_path).open("rb") as fp: - resp = mly_uploader.upload_stream( + file_handle = mly_uploader.upload_stream( fp, - upload_api_v4.ClusterFileType.BLACKVUE, "this_is_a_blackvue.mp4", ) - assert resp == "0" + cluster_id = mly_uploader.finish_upload( + file_handle, api_v4.ClusterFileType.BLACKVUE + ) + assert cluster_id == "0" for mp4_path in setup_upload.listdir(): assert os.path.basename(mp4_path) == "this_is_a_blackvue.mp4" with open(mp4_path, "rb") as fp: From 45c0fa4c01ebe24d87197b80bf2e778bb235d37f Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 10 Jun 2025 13:19:33 -0700 Subject: [PATCH 03/15] sort --- tests/unit/test_uploader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 1e53e6f68..71f21c87f 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -8,7 +8,7 @@ import pytest -from mapillary_tools import types, upload_api_v4, uploader, utils, api_v4 +from mapillary_tools import api_v4, types, upload_api_v4, uploader, utils from ..integration.fixtures import setup_upload, validate_and_extract_zip From 02b3dee9fffa82dd16338e86d7e35e19d5344128 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 10 Jun 2025 15:41:06 -0700 Subject: [PATCH 04/15] fix tests --- mapillary_tools/uploader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 069436328..eeb6542a7 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -329,8 +329,6 @@ def zip_images_and_upload( "file_type": types.FileType.IMAGE.value, } - mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} - try: _validate_metadatas(sequence) except Exception as ex: @@ -353,6 +351,8 @@ def zip_images_and_upload( upload_md5sum, api_v4.ClusterFileType.ZIP ) + mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} + try: file_handle = uploader.upload_stream( fp, upload_session_key, progress=mutable_progress From 078f5cef2a1293b5569d81939fe496f8977c3f54 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 10 Jun 2025 17:26:21 -0700 Subject: [PATCH 05/15] refactor --- mapillary_tools/uploader.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index eeb6542a7..fda64dc20 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -208,8 +208,10 @@ def zip_sequence_fp( with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: for idx, metadata in enumerate(sequence): - # Arcname does not matter, but it should be unique - cls._write_imagebytes_in_zip(zipf, metadata, arcname=f"{idx}.jpg") + # Arcname should be unique, the name does not matter + arcname = f"{idx}.jpg" + zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) + zipf.writestr(zipinfo, cls._dump_image_bytes(metadata)) assert len(sequence) == len(set(zipf.namelist())) zipf.comment = json.dumps({"sequence_md5sum": sequence_md5sum}).encode( "utf-8" @@ -241,9 +243,7 @@ def extract_sequence_md5sum(cls, zip_fp: T.IO[bytes]) -> str: return sequence_md5sum @classmethod - def _write_imagebytes_in_zip( - cls, zipf: zipfile.ZipFile, metadata: types.ImageMetadata, arcname: str - ): + def _dump_image_bytes(cls, metadata: types.ImageMetadata) -> bytes: try: edit = exif_write.ExifEdit(metadata.filename) except struct.error as ex: @@ -255,15 +255,12 @@ def _write_imagebytes_in_zip( ) try: - image_bytes = edit.dump_image_bytes() + return edit.dump_image_bytes() except struct.error as ex: raise ExifError( f"Failed to dump EXIF bytes: {ex}", metadata.filename ) from ex - zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) - zipf.writestr(zipinfo, image_bytes) - @classmethod def upload_zipfile( cls, From f2c1af74d5ecc9eb937cdb6f088c705fe80b3b70 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Fri, 13 Jun 2025 10:23:20 -0700 Subject: [PATCH 06/15] wip --- mapillary_tools/api_v4.py | 1 + mapillary_tools/upload.py | 61 ++++++++++++++-- mapillary_tools/upload_api_v4.py | 19 +++-- mapillary_tools/uploader.py | 117 ++++++++++++++++++++++++++++-- tests/integration/fixtures.py | 37 ++++++++++ tests/integration/test_history.py | 9 ++- tests/integration/test_upload.py | 57 +++++---------- 7 files changed, 240 insertions(+), 61 deletions(-) diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index 4511f2088..317aef850 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -25,6 +25,7 @@ class ClusterFileType(enum.Enum): ZIP = "zip" BLACKVUE = "mly_blackvue_video" CAMM = "mly_camm_video" + MLY_BUNDLE_MANIFEST = "mly_bundle_manifest" class HTTPSystemCertsAdapter(HTTPAdapter): diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 2c7c0a7c6..1bf278e35 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -191,10 +191,50 @@ def write_history(payload: uploader.Progress): def _setup_tdqm(emitter: uploader.EventEmitter) -> None: upload_pbar: tqdm | None = None + @emitter.on("upload_start") + def upload_start_for_images(payload: uploader.Progress) -> None: + nonlocal upload_pbar + + if payload.get("file_type") != FileType.IMAGE.value: + return + + if upload_pbar is not None: + upload_pbar.close() + + nth = payload["sequence_idx"] + 1 + total = payload["total_sequence_count"] + filetype = payload.get("file_type", "unknown").upper() + import_path: str | None = payload.get("import_path") + if import_path is None: + desc = f"Uploading {filetype} ({nth}/{total})" + else: + desc = ( + f"Uploading {filetype} {os.path.basename(import_path)} ({nth}/{total})" + ) + upload_pbar = tqdm( + total=payload["sequence_image_count"], + desc=desc, + unit="B", + unit_scale=True, + unit_divisor=1024, + initial=0, + disable=LOG.getEffectiveLevel() <= logging.DEBUG, + ) + + @emitter.on("upload_progress") + def upload_progress_for_images(payload: uploader.Progress) -> None: + if payload.get("file_type") != FileType.IMAGE.value: + return + assert upload_pbar is not None, "progress_bar must be initialized" + upload_pbar.update() + @emitter.on("upload_fetch_offset") def upload_fetch_offset(payload: uploader.Progress) -> None: nonlocal upload_pbar + if payload.get("file_type") == FileType.IMAGE.value: + return + if upload_pbar is not None: upload_pbar.close() @@ -203,14 +243,14 @@ def upload_fetch_offset(payload: uploader.Progress) -> None: import_path: str | None = payload.get("import_path") filetype = payload.get("file_type", "unknown").upper() if import_path is None: - _desc = f"Uploading {filetype} ({nth}/{total})" + desc = f"Uploading {filetype} ({nth}/{total})" else: - _desc = ( + desc = ( f"Uploading {filetype} {os.path.basename(import_path)} ({nth}/{total})" ) upload_pbar = tqdm( total=payload["entity_size"], - desc=_desc, + desc=desc, unit="B", unit_scale=True, unit_divisor=1024, @@ -220,6 +260,8 @@ def upload_fetch_offset(payload: uploader.Progress) -> None: @emitter.on("upload_progress") def upload_progress(payload: uploader.Progress) -> None: + if payload.get("file_type") == FileType.IMAGE.value: + return assert upload_pbar is not None, "progress_bar must be initialized" upload_pbar.update(payload["chunk_size"]) @@ -294,8 +336,13 @@ def _setup_api_stats(emitter: uploader.EventEmitter): @emitter.on("upload_start") def collect_start_time(payload: _APIStats) -> None: - payload["upload_start_time"] = time.time() + now = time.time() + payload["upload_start_time"] = now payload["upload_total_time"] = 0 + # These filed should be initialized in upload events like "upload_fetch_offset" + # but since we disabled them for uploading images, so we initialize them here + payload["upload_last_restart_time"] = now + payload["upload_first_offset"] = 0 @emitter.on("upload_fetch_offset") def collect_restart_time(payload: _APIStats) -> None: @@ -465,7 +512,7 @@ def _gen_upload_everything( (m for m in metadatas if isinstance(m, types.ImageMetadata)), utils.find_images(import_paths, skip_subfolders=skip_subfolders), ) - for image_result in uploader.ZipImageSequence.zip_images_and_upload( + for image_result in uploader.ZipImageSequence.upload_images( mly_uploader, image_metadatas, ): @@ -709,7 +756,9 @@ def upload( finally: # We collected stats after every upload is finished - assert upload_successes == len(stats) + assert upload_successes == len(stats), ( + f"Expect {upload_successes} success but got {stats}" + ) _show_upload_summary(stats, upload_errors) diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 276afc64c..fe1b2eb66 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -4,6 +4,7 @@ import os import random import sys +from pathlib import Path import typing as T import uuid @@ -138,8 +139,8 @@ def upload_shifted_chunks( class FakeUploadService(UploadService): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._upload_path = os.getenv( - "MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads" + self._upload_path = Path( + os.getenv("MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads") ) self._error_ratio = 0.02 @@ -156,8 +157,8 @@ def upload_shifted_chunks( ) os.makedirs(self._upload_path, exist_ok=True) - filename = os.path.join(self._upload_path, self.session_key) - with open(filename, "ab") as fp: + filename = self._upload_path.joinpath(self.session_key) + with filename.open("ab") as fp: for chunk in shifted_chunks: if random.random() <= self._error_ratio: raise requests.ConnectionError( @@ -168,7 +169,15 @@ def upload_shifted_chunks( raise requests.ConnectionError( f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}" ) - return uuid.uuid4().hex + + file_handle_dir = self._upload_path.joinpath("file_handles") + file_handle_path = file_handle_dir.joinpath(self.session_key) + if not file_handle_path.exists(): + os.makedirs(file_handle_dir, exist_ok=True) + random_file_handle = uuid.uuid4().hex + file_handle_path.write_text(random_file_handle) + + return file_handle_path.read_text() @override def fetch_offset(self) -> int: diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index fda64dc20..1b667c11a 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -125,6 +125,7 @@ def __init__(self): def on(self, event: EventName): def _wrap(callback): self.events.setdefault(event, []).append(callback) + return callback return _wrap @@ -323,7 +324,7 @@ def zip_images_and_upload( "total_sequence_count": len(sequences), "sequence_image_count": len(sequence), "sequence_uuid": sequence_uuid, - "file_type": types.FileType.IMAGE.value, + "file_type": types.FileType.ZIP.value, } try: @@ -365,6 +366,103 @@ def zip_images_and_upload( yield sequence_uuid, UploadResult(result=cluster_id) + @classmethod + def _upload_sequence( + cls, + uploader: Uploader, + sequence: T.Sequence[types.ImageMetadata], + progress: dict[str, T.Any] | None = None, + ) -> str: + if progress is None: + progress = {} + + _validate_metadatas(sequence) + + # TODO: assert sequence is sorted + + # FIXME: This is a hack to disable the event emitter inside the uploader + uploader.emittion_disabled = True + + uploader.emitter.emit("upload_start", progress) + + image_file_handles: list[str] = [] + total_bytes = 0 + for image_metadata in sequence: + mutable_progress: dict[str, T.Any] = { + **progress, + "filename": str(image_metadata.filename), + } + + bytes = cls._dump_image_bytes(image_metadata) + total_bytes += len(bytes) + upload_md5sum = utils.md5sum_fp(io.BytesIO(bytes)).hexdigest() + file_handle = uploader.upload_stream( + io.BytesIO(bytes), f"{upload_md5sum}.jpg" + ) + image_file_handles.append(file_handle) + + uploader.emitter.emit("upload_progress", mutable_progress) + + manifest = { + "version": "1", + "upload_type": "images", + "image_handles": image_file_handles, + } + + with io.BytesIO() as manifest_fp: + manifest_fp.write(json.dumps(manifest).encode("utf-8")) + manifest_fp.seek(0, io.SEEK_SET) + manifest_file_handle = uploader.upload_stream(manifest_fp, f"{uuid.uuid4().hex}.json") + + progress["entity_size"] = total_bytes + uploader.emitter.emit("upload_end", progress) + + # FIXME: This is a hack to disable the event emitter inside the uploader + uploader.emittion_disabled = False + + cluster_id = uploader.finish_upload( + manifest_file_handle, + api_v4.ClusterFileType.MLY_BUNDLE_MANIFEST, + progress=progress, + ) + + return cluster_id + + @classmethod + def upload_images( + cls, + uploader: Uploader, + image_metadatas: T.Sequence[types.ImageMetadata], + progress: dict[str, T.Any] | None = None, + ) -> T.Generator[tuple[str, UploadResult], None, None]: + if progress is None: + progress = {} + + sequences = types.group_and_sort_images(image_metadatas) + + for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): + sequence_md5sum = types.update_sequence_md5sum(sequence) + + sequence_progress: SequenceProgress = { + "sequence_idx": sequence_idx, + "total_sequence_count": len(sequences), + "sequence_image_count": len(sequence), + "sequence_uuid": sequence_uuid, + "file_type": types.FileType.IMAGE.value, + "sequence_md5sum": sequence_md5sum, + } + + mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} + + try: + cluster_id = cls._upload_sequence( + uploader, sequence, progress=mutable_progress + ) + except Exception as ex: + yield sequence_uuid, UploadResult(error=ex) + else: + yield sequence_uuid, UploadResult(result=cluster_id) + class Uploader: def __init__( @@ -375,6 +473,7 @@ def __init__( dry_run=False, ): self.user_items = user_items + self.emittion_disabled = False if emitter is None: # An empty event emitter that does nothing self.emitter = EventEmitter() @@ -402,7 +501,7 @@ def upload_stream( progress["retries"] = 0 progress["begin_offset"] = None - self.emitter.emit("upload_start", progress) + self._maybe_emit("upload_start", progress) while True: try: @@ -416,7 +515,7 @@ def upload_stream( progress["retries"] += 1 - self.emitter.emit("upload_end", progress) + self._maybe_emit("upload_end", progress) return file_handle @@ -444,7 +543,7 @@ def _handle_upload_exception( chunk_size = progress["chunk_size"] if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - self.emitter.emit("upload_interrupted", progress) + self._maybe_emit("upload_interrupted", progress) LOG.warning( # use %s instead of %d because offset could be None "Error uploading chunk_size %d at begin_offset %s: %s: %s", @@ -485,7 +584,7 @@ def _chunk_with_progress_emitted( # Whenever a chunk is uploaded, reset retries progress["retries"] = 0 - self.emitter.emit("upload_progress", progress) + self._maybe_emit("upload_progress", progress) def _upload_stream_retryable( self, @@ -500,7 +599,7 @@ def _upload_stream_retryable( progress["begin_offset"] = begin_offset progress["offset"] = begin_offset - self.emitter.emit("upload_fetch_offset", progress) + self._maybe_emit("upload_fetch_offset", progress) fp.seek(begin_offset, io.SEEK_SET) @@ -534,10 +633,14 @@ def finish_upload( # TODO: validate cluster_id progress["cluster_id"] = cluster_id - self.emitter.emit("upload_finished", progress) + self._maybe_emit("upload_finished", progress) return cluster_id + def _maybe_emit(self, event: EventName, progress: dict[str, T.Any]): + if not self.emittion_disabled: + return self.emitter.emit(event, progress) + def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]): for metadata in metadatas: diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 71ccb8117..34a162e98 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -1,3 +1,4 @@ +import hashlib import json import os import shutil @@ -291,3 +292,39 @@ def verify_descs(expected: T.List[T.Dict], actual: T.Union[Path, T.List[T.Dict]] if "MAPDeviceModel" in expected_desc: assert expected_desc["MAPDeviceModel"] == actual_desc["MAPDeviceModel"] + + +def validate_uploaded_images(upload_folder: Path): + session_by_file_handle: dict[str, str] = {} + for session_path in upload_folder.joinpath("file_handles").iterdir(): + file_handle = session_path.read_text() + session_by_file_handle[file_handle] = session_path.name + + sequence_paths = [] + for file in upload_folder.iterdir(): + if file.suffix == ".json": + with file.open() as fp: + manifest = json.load(fp) + image_file_handles = manifest["image_handles"] + sequence_paths.append( + [ + upload_folder.joinpath(session_by_file_handle[file_handle]) + for file_handle in image_file_handles + ] + ) + + return [ + [validate_and_extract_image(str(path)) for path in paths] + for paths in sequence_paths + ] + + +def file_md5sum(path) -> str: + with open(path, "rb") as fp: + md5 = hashlib.md5() + while True: + buf = fp.read(1024 * 1024 * 32) + if not buf: + break + md5.update(buf) + return md5.hexdigest() diff --git a/tests/integration/test_history.py b/tests/integration/test_history.py index 0b7060fab..62cd8d6ff 100644 --- a/tests/integration/test_history.py +++ b/tests/integration/test_history.py @@ -47,18 +47,19 @@ def test_upload_gopro( shell=True, ) assert x.returncode == 0, x.stderr - assert len(setup_upload.listdir()) == 1, ( + assert len(setup_upload.listdir()) == 2, ( f"should be uploaded for the first time but got {setup_upload.listdir()}" ) for upload in setup_upload.listdir(): - upload.remove() - assert len(setup_upload.listdir()) == 0 + if upload.basename != "file_handles": + upload.remove() + assert len(setup_upload.listdir()) == 1 x = subprocess.run( f"{EXECUTABLE} process_and_upload --skip_process_errors {UPLOAD_FLAGS} {str(video_dir)}", shell=True, ) assert x.returncode == 0, x.stderr - assert len(setup_upload.listdir()) == 0, ( + assert len(setup_upload.listdir()) == 1, ( "should NOT upload because it is uploaded already" ) diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py index 97cb40f74..753622d52 100644 --- a/tests/integration/test_upload.py +++ b/tests/integration/test_upload.py @@ -1,6 +1,4 @@ -import hashlib import json -import os import subprocess from pathlib import Path @@ -13,7 +11,7 @@ setup_data, setup_upload, USERNAME, - validate_and_extract_zip, + validate_uploaded_images, ) @@ -21,41 +19,33 @@ UPLOAD_FLAGS = f"--dry_run --user_name={USERNAME}" -def file_md5sum(path) -> str: - with open(path, "rb") as fp: - md5 = hashlib.md5() - while True: - buf = fp.read(1024 * 1024 * 32) - if not buf: - break - md5.update(buf) - return md5.hexdigest() - - @pytest.mark.usefixtures("setup_config") -def test_upload_image_dir( - setup_data: py.path.local, - setup_upload: py.path.local, -): +def test_upload_image_dir(setup_data: py.path.local, setup_upload: py.path.local): x = subprocess.run( f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data}", shell=True, ) - assert x.returncode == 0, x.stderr + desc_path = setup_data.join("mapillary_image_description.json") + with open(desc_path) as fp: + descs = json.load(fp) + for desc in descs: + # TODO: check if the descs are valid + pass + x = subprocess.run( f"{EXECUTABLE} process_and_upload {UPLOAD_FLAGS} --file_types=image {setup_data}", shell=True, ) - for file in setup_upload.listdir(): - validate_and_extract_zip(Path(file)) assert x.returncode == 0, x.stderr + descs = validate_uploaded_images(Path(setup_upload)) + for x in descs: + # TODO: check if the descs are valid + pass + @pytest.mark.usefixtures("setup_config") -def test_upload_image_dir_twice( - setup_data: py.path.local, - setup_upload: py.path.local, -): +def test_upload_image_dir_twice(setup_data: py.path.local, setup_upload: py.path.local): x = subprocess.run( f"{EXECUTABLE} process --skip_process_errors {PROCESS_FLAGS} {setup_data}", shell=True, @@ -63,17 +53,13 @@ def test_upload_image_dir_twice( assert x.returncode == 0, x.stderr desc_path = setup_data.join("mapillary_image_description.json") - md5sum_map = {} - # first upload x = subprocess.run( f"{EXECUTABLE} process_and_upload {UPLOAD_FLAGS} --file_types=image {setup_data}", shell=True, ) assert x.returncode == 0, x.stderr - for file in setup_upload.listdir(): - validate_and_extract_zip(Path(file)) - md5sum_map[os.path.basename(file)] = file_md5sum(file) + validate_uploaded_images(Path(setup_upload)) # expect the second upload to not produce new uploads x = subprocess.run( @@ -81,18 +67,11 @@ def test_upload_image_dir_twice( shell=True, ) assert x.returncode == 0, x.stderr - for file in setup_upload.listdir(): - validate_and_extract_zip(Path(file)) - new_md5sum = file_md5sum(file) - assert md5sum_map[os.path.basename(file)] == new_md5sum - assert len(md5sum_map) == len(setup_upload.listdir()) + validate_uploaded_images(Path(setup_upload)) @pytest.mark.usefixtures("setup_config") -def test_upload_wrong_descs( - setup_data: py.path.local, - setup_upload: py.path.local, -): +def test_upload_wrong_descs(setup_data: py.path.local, setup_upload: py.path.local): x = subprocess.run( f"{EXECUTABLE} process --skip_process_errors {PROCESS_FLAGS} {setup_data}", shell=True, From 2b8ff25fd03b03a443075ab2294eb7a6407cbec1 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 17 Jun 2025 16:21:08 -0700 Subject: [PATCH 07/15] fix tests --- mapillary_tools/upload.py | 6 - mapillary_tools/upload_api_v4.py | 2 +- mapillary_tools/uploader.py | 64 ++--- tests/integration/fixtures.py | 266 +++++++++++-------- tests/integration/test_gopro.py | 28 +- tests/integration/test_process.py | 58 ++-- tests/integration/test_process_and_upload.py | 79 ++---- tests/integration/test_upload.py | 53 ++-- tests/unit/test_uploader.py | 64 ++--- 9 files changed, 313 insertions(+), 307 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 1bf278e35..ae28205b6 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -556,13 +556,8 @@ def _gen_upload_videos( "file_type": video_metadata.filetype.value, "import_path": str(video_metadata.filename), "sequence_md5sum": video_metadata.md5sum, - "upload_md5sum": video_metadata.md5sum, } - session_key = uploader._session_key( - video_metadata.md5sum, api_v4.ClusterFileType.CAMM - ) - try: with video_metadata.filename.open("rb") as src_fp: # Build the mp4 stream with the CAMM samples @@ -573,7 +568,6 @@ def _gen_upload_videos( # Upload the mp4 stream file_handle = mly_uploader.upload_stream( T.cast(T.IO[bytes], camm_fp), - session_key, progress=T.cast(T.Dict[str, T.Any], progress), ) cluster_id = mly_uploader.finish_upload( diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index fe1b2eb66..57afbc7d3 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -4,9 +4,9 @@ import os import random import sys -from pathlib import Path import typing as T import uuid +from pathlib import Path if sys.version_info >= (3, 12): from typing import override diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 1b667c11a..d153616ce 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -280,25 +280,17 @@ def upload_zipfile( with zip_path.open("rb") as zip_fp: sequence_md5sum = cls.extract_sequence_md5sum(zip_fp) - with zip_path.open("rb") as zip_fp: - upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() - sequence_progress: SequenceProgress = { "sequence_image_count": len(namelist), "file_type": types.FileType.ZIP.value, "sequence_md5sum": sequence_md5sum, - "upload_md5sum": upload_md5sum, } # Send the copy of the input progress to each upload session, to avoid modifying the original one mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} - upload_session_key = _session_key(upload_md5sum, api_v4.ClusterFileType.ZIP) - with zip_path.open("rb") as zip_fp: - file_handle = uploader.upload_stream( - zip_fp, upload_session_key, progress=mutable_progress - ) + file_handle = uploader.upload_stream(zip_fp, progress=mutable_progress) cluster_id = uploader.finish_upload( file_handle, api_v4.ClusterFileType.ZIP, progress=mutable_progress @@ -342,19 +334,10 @@ def zip_images_and_upload( sequence_progress["sequence_md5sum"] = sequence_md5sum - fp.seek(0, io.SEEK_SET) - upload_md5sum = utils.md5sum_fp(fp).hexdigest() - - upload_session_key = _session_key( - upload_md5sum, api_v4.ClusterFileType.ZIP - ) - mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} try: - file_handle = uploader.upload_stream( - fp, upload_session_key, progress=mutable_progress - ) + file_handle = uploader.upload_stream(fp, progress=mutable_progress) cluster_id = uploader.finish_upload( file_handle, api_v4.ClusterFileType.ZIP, @@ -395,9 +378,8 @@ def _upload_sequence( bytes = cls._dump_image_bytes(image_metadata) total_bytes += len(bytes) - upload_md5sum = utils.md5sum_fp(io.BytesIO(bytes)).hexdigest() file_handle = uploader.upload_stream( - io.BytesIO(bytes), f"{upload_md5sum}.jpg" + io.BytesIO(bytes), progress=mutable_progress ) image_file_handles.append(file_handle) @@ -412,7 +394,9 @@ def _upload_sequence( with io.BytesIO() as manifest_fp: manifest_fp.write(json.dumps(manifest).encode("utf-8")) manifest_fp.seek(0, io.SEEK_SET) - manifest_file_handle = uploader.upload_stream(manifest_fp, f"{uuid.uuid4().hex}.json") + manifest_file_handle = uploader.upload_stream( + manifest_fp, session_key=f"{uuid.uuid4().hex}.json" + ) progress["entity_size"] = total_bytes uploader.emitter.emit("upload_end", progress) @@ -485,12 +469,21 @@ def __init__( def upload_stream( self, fp: T.IO[bytes], - session_key: str, + session_key: str | None = None, progress: dict[str, T.Any] | None = None, ) -> str: if progress is None: progress = {} + if session_key is None: + fp.seek(0, io.SEEK_SET) + md5sum = utils.md5sum_fp(fp).hexdigest() + filetype = progress.get("file_type") + if filetype is not None: + session_key = _session_key(md5sum, types.FileType(filetype)) + else: + session_key = md5sum + fp.seek(0, io.SEEK_END) entity_size = fp.tell() @@ -682,12 +675,19 @@ def _is_retriable_exception(ex: Exception): return False -_SUFFIX_MAP: dict[api_v4.ClusterFileType, str] = { - api_v4.ClusterFileType.ZIP: ".zip", - api_v4.ClusterFileType.CAMM: ".mp4", - api_v4.ClusterFileType.BLACKVUE: ".mp4", -} - - -def _session_key(upload_md5sum: str, cluster_filetype: api_v4.ClusterFileType) -> str: - return f"mly_tools_{upload_md5sum}{_SUFFIX_MAP[cluster_filetype]}" +def _session_key( + upload_md5sum: str, filetype: api_v4.ClusterFileType | types.FileType +) -> str: + _SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = { + api_v4.ClusterFileType.ZIP: ".zip", + api_v4.ClusterFileType.CAMM: ".mp4", + api_v4.ClusterFileType.BLACKVUE: ".mp4", + types.FileType.IMAGE: ".jpg", + types.FileType.ZIP: ".zip", + types.FileType.BLACKVUE: ".mp4", + types.FileType.CAMM: ".mp4", + types.FileType.GOPRO: ".mp4", + types.FileType.VIDEO: ".mp4", + } + + return f"mly_tools_{upload_md5sum}{_SUFFIX_MAP[filetype]}" diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 34a162e98..807ab32ce 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -1,10 +1,10 @@ -import hashlib +from __future__ import annotations + import json import os import shutil import subprocess import tempfile -import typing as T import zipfile from pathlib import Path @@ -160,52 +160,59 @@ def run_exiftool_and_generate_geotag_args( with open("schema/image_description_schema.json") as fp: - image_description_schema = json.load(fp) + IMAGE_DESCRIPTION_SCHEMA = json.load(fp) -def validate_and_extract_image(image_path: str): - with open(image_path, "rb") as fp: +def validate_and_extract_image(image_path: Path): + with image_path.open("rb") as fp: tags = exifread.process_file(fp) desc_tag = tags.get("Image ImageDescription") assert desc_tag is not None, (tags, image_path) desc = json.loads(str(desc_tag.values)) - desc["filename"] = image_path + desc["filename"] = str(image_path) desc["filetype"] = "image" - jsonschema.validate(desc, image_description_schema) + jsonschema.validate(desc, IMAGE_DESCRIPTION_SCHEMA) return desc -def validate_and_extract_zip(zip_path: Path) -> T.List[T.Dict]: +def validate_and_extract_zip(zip_path: Path) -> list[dict]: + with zip_path.open("rb") as fp: + upload_md5sum = utils.md5sum_fp(fp).hexdigest() + + assert f"mly_tools_{upload_md5sum}.zip" == zip_path.name, ( + zip_path.name, + upload_md5sum, + ) + descs = [] with zipfile.ZipFile(zip_path) as zipf: - _sequence_md5sum = json.loads(zipf.comment)["sequence_md5sum"] with tempfile.TemporaryDirectory() as tempdir: zipf.extractall(path=tempdir) for name in os.listdir(tempdir): filename = os.path.join(tempdir, name) - desc = validate_and_extract_image(filename) + desc = validate_and_extract_image(Path(filename)) descs.append(desc) - with zip_path.open("rb") as fp: + return descs + + +def validate_and_extract_camm(video_path: Path) -> list[dict]: + with video_path.open("rb") as fp: upload_md5sum = utils.md5sum_fp(fp).hexdigest() - assert f"mly_tools_{upload_md5sum}.zip" == zip_path.name, ( - zip_path.name, + assert f"mly_tools_{upload_md5sum}.mp4" == video_path.name, ( + video_path.name, upload_md5sum, ) - return descs - - -def validate_and_extract_camm(filename: str) -> T.List[T.Dict]: if not IS_FFMPEG_INSTALLED: return [] with tempfile.TemporaryDirectory() as tempdir: x = subprocess.run( - f"{EXECUTABLE} --verbose video_process --video_sample_interval=2 --video_sample_distance=-1 --geotag_source=camm {filename} {tempdir}", + f"{EXECUTABLE} --verbose video_process --video_sample_interval=2 --video_sample_distance=-1 --geotag_source=camm {str(video_path)} {tempdir}", shell=True, ) assert x.returncode == 0, x.stderr @@ -223,108 +230,135 @@ def validate_and_extract_camm(filename: str) -> T.List[T.Dict]: return json.load(fp) -def verify_descs(expected: T.List[T.Dict], actual: T.Union[Path, T.List[T.Dict]]): - if isinstance(actual, Path): - with actual.open("r") as fp: - actual = json.load(fp) - assert isinstance(actual, list), f"expect a list of descs but got: {actual}" - - expected_map = {desc["filename"]: desc for desc in expected} - assert len(expected) == len(expected_map), expected - - actual_map = {desc["filename"]: desc for desc in actual} - assert len(actual) == len(actual_map), actual - - for filename, expected_desc in expected_map.items(): - actual_desc = actual_map.get(filename) - assert actual_desc is not None, expected_desc - if "error" in expected_desc: - assert expected_desc["error"]["type"] == actual_desc.get("error", {}).get( - "type" - ), f"{expected_desc=} != {actual_desc=}" - if "message" in expected_desc["error"]: - assert ( - expected_desc["error"]["message"] == actual_desc["error"]["message"] - ) - if "filetype" in expected_desc: - assert expected_desc["filetype"] == actual_desc.get("filetype"), actual_desc - - if "MAPCompassHeading" in expected_desc: - e = expected_desc["MAPCompassHeading"] - assert "MAPCompassHeading" in actual_desc, actual_desc - a = actual_desc["MAPCompassHeading"] - assert abs(e["TrueHeading"] - a["TrueHeading"]) < 0.001, ( - f"got {a['TrueHeading']} but expect {e['TrueHeading']} in {filename}" - ) - assert abs(e["MagneticHeading"] - a["MagneticHeading"]) < 0.001, ( - f"got {a['MagneticHeading']} but expect {e['MagneticHeading']} in {filename}" - ) - - if "MAPCaptureTime" in expected_desc: - assert expected_desc["MAPCaptureTime"] == actual_desc["MAPCaptureTime"], ( - f"expect {expected_desc['MAPCaptureTime']} but got {actual_desc['MAPCaptureTime']} in {filename}" - ) - - if "MAPLongitude" in expected_desc: - assert ( - abs(expected_desc["MAPLongitude"] - actual_desc["MAPLongitude"]) - < 0.00001 - ), ( - f"expect {expected_desc['MAPLongitude']} but got {actual_desc['MAPLongitude']} in {filename}" - ) - - if "MAPLatitude" in expected_desc: - assert ( - abs(expected_desc["MAPLatitude"] - actual_desc["MAPLatitude"]) < 0.00001 - ), ( - f"expect {expected_desc['MAPLatitude']} but got {actual_desc['MAPLatitude']} in {filename}" - ) - - if "MAPAltitude" in expected_desc: - assert ( - abs(expected_desc["MAPAltitude"] - actual_desc["MAPAltitude"]) < 0.001 - ), ( - f"expect {expected_desc['MAPAltitude']} but got {actual_desc['MAPAltitude']} in {filename}" - ) - - if "MAPDeviceMake" in expected_desc: - assert expected_desc["MAPDeviceMake"] == actual_desc["MAPDeviceMake"] - - if "MAPDeviceModel" in expected_desc: - assert expected_desc["MAPDeviceModel"] == actual_desc["MAPDeviceModel"] - - -def validate_uploaded_images(upload_folder: Path): +def load_descs(descs) -> list: + if isinstance(descs, Path): + with descs.open("r") as fp: + descs = json.load(fp) + assert isinstance(descs, list), f"expect a list of descs but got: {descs}" + return descs + + +def extract_all_uploaded_descs(upload_folder: Path) -> list[list[dict]]: + FILE_HANDLE_DIRNAME = "file_handles" + session_by_file_handle: dict[str, str] = {} - for session_path in upload_folder.joinpath("file_handles").iterdir(): - file_handle = session_path.read_text() - session_by_file_handle[file_handle] = session_path.name + if upload_folder.joinpath(FILE_HANDLE_DIRNAME).exists(): + for session_path in upload_folder.joinpath(FILE_HANDLE_DIRNAME).iterdir(): + file_handle = session_path.read_text() + session_by_file_handle[file_handle] = session_path.name + + sequences = [] - sequence_paths = [] for file in upload_folder.iterdir(): if file.suffix == ".json": with file.open() as fp: manifest = json.load(fp) image_file_handles = manifest["image_handles"] - sequence_paths.append( - [ - upload_folder.joinpath(session_by_file_handle[file_handle]) - for file_handle in image_file_handles - ] - ) - - return [ - [validate_and_extract_image(str(path)) for path in paths] - for paths in sequence_paths - ] - - -def file_md5sum(path) -> str: - with open(path, "rb") as fp: - md5 = hashlib.md5() - while True: - buf = fp.read(1024 * 1024 * 32) - if not buf: - break - md5.update(buf) - return md5.hexdigest() + assert len(image_file_handles) > 0, manifest + image_sequence = [] + for file_handle in image_file_handles: + image_path = upload_folder.joinpath(session_by_file_handle[file_handle]) + with image_path.open("rb") as fp: + upload_md5sum = utils.md5sum_fp(fp).hexdigest() + assert upload_md5sum in image_path.stem, (upload_md5sum, image_path) + image_sequence.append(validate_and_extract_image(image_path)) + sequences.append(image_sequence) + elif file.suffix == ".zip": + sequences.append(validate_and_extract_zip(file)) + elif file.suffix == ".mp4": + sequences.append(validate_and_extract_camm(file)) + elif file.name == FILE_HANDLE_DIRNAME: + # Already processed above + pass + + return sequences + + +def approximate(left: float, right: float, threshold=0.00001): + return abs(left - right) < threshold + + +def assert_compare_image_descs(expected: dict, actual: dict): + jsonschema.validate(expected, IMAGE_DESCRIPTION_SCHEMA) + jsonschema.validate(actual, IMAGE_DESCRIPTION_SCHEMA) + + assert expected.get("MAPFilename"), expected + assert actual.get("MAPFilename"), actual + assert expected.get("MAPFilename") == actual.get("MAPFilename") + + filename = actual.get("MAPFilename") + + if "error" in expected: + assert expected["error"]["type"] == actual.get("error", {}).get("type"), ( + f"{expected=} != {actual=}" + ) + if "message" in expected["error"]: + assert expected["error"]["message"] == actual["error"]["message"] + + if "filetype" in expected: + assert expected["filetype"] == actual.get("filetype"), actual + + if "MAPCompassHeading" in expected: + e = expected["MAPCompassHeading"] + assert "MAPCompassHeading" in actual, actual + a = actual["MAPCompassHeading"] + assert approximate(e["TrueHeading"], a["TrueHeading"], 0.001), ( + f"got {a['TrueHeading']} but expect {e['TrueHeading']} in {filename}" + ) + assert approximate(e["MagneticHeading"], a["MagneticHeading"], 0.001), ( + f"got {a['MagneticHeading']} but expect {e['MagneticHeading']} in {filename}" + ) + + if "MAPCaptureTime" in expected: + assert expected["MAPCaptureTime"] == actual["MAPCaptureTime"], ( + f"expect {expected['MAPCaptureTime']} but got {actual['MAPCaptureTime']} in {filename}" + ) + + if "MAPLongitude" in expected: + assert approximate(expected["MAPLongitude"], actual["MAPLongitude"], 0.00001), ( + f"expect {expected['MAPLongitude']} but got {actual['MAPLongitude']} in {filename}" + ) + + if "MAPLatitude" in expected: + assert approximate(expected["MAPLatitude"], actual["MAPLatitude"], 0.00001), ( + f"expect {expected['MAPLatitude']} but got {actual['MAPLatitude']} in {filename}" + ) + + if "MAPAltitude" in expected: + assert approximate(expected["MAPAltitude"], actual["MAPAltitude"], 0.001), ( + f"expect {expected['MAPAltitude']} but got {actual['MAPAltitude']} in {filename}" + ) + + if "MAPDeviceMake" in expected: + assert expected["MAPDeviceMake"] == actual["MAPDeviceMake"] + + if "MAPDeviceModel" in expected: + assert expected["MAPDeviceModel"] == actual["MAPDeviceModel"] + + +def assert_contains_image_descs(haystack: Path | list[dict], needle: Path | list[dict]): + """ + Check if the haystack contains all the descriptions in needle. + """ + + haystack = load_descs(haystack) + needle = load_descs(needle) + + haystack_by_filename = { + desc["MAPFilename"]: desc for desc in haystack if "MAPFilename" in desc + } + + needle_by_filename = { + desc["MAPFilename"]: desc for desc in needle if "MAPFilename" in desc + } + + assert haystack_by_filename.keys() >= needle_by_filename.keys(), ( + f"haystack {list(haystack_by_filename.keys())} does not contain all the keys in needle {list(needle_by_filename.keys())}" + ) + for filename, desc in needle_by_filename.items(): + assert_compare_image_descs(desc, haystack_by_filename[filename]) + + +def assert_same_image_descs(left: Path | list[dict], right: Path | list[dict]): + assert_contains_image_descs(left, right) + assert_contains_image_descs(right, left) diff --git a/tests/integration/test_gopro.py b/tests/integration/test_gopro.py index b02587dd6..8fcbbf121 100644 --- a/tests/integration/test_gopro.py +++ b/tests/integration/test_gopro.py @@ -8,12 +8,12 @@ import pytest from .fixtures import ( + assert_same_image_descs, EXECUTABLE, IS_FFMPEG_INSTALLED, run_exiftool_and_generate_geotag_args, setup_config, setup_upload, - verify_descs, ) @@ -26,6 +26,8 @@ } EXPECTED_DESCS: T.List[T.Any] = [ { + "filename": "hero8.mp4/hero8_NA_000001.jpg", + "filetype": "image", "MAPAltitude": 9540.24, "MAPCaptureTime": "2019_11_18_15_41_12_354", "MAPCompassHeading": { @@ -36,9 +38,11 @@ "MAPLongitude": -129.2943386, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000001.jpg", + "MAPFilename": "hero8_NA_000001.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000002.jpg", + "filetype": "image", "MAPAltitude": 7112.573717404068, "MAPCaptureTime": "2019_11_18_15_41_14_354", "MAPCompassHeading": { @@ -49,9 +53,11 @@ "MAPLongitude": -126.85929159704702, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000002.jpg", + "MAPFilename": "hero8_NA_000002.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000003.jpg", + "filetype": "image", "MAPAltitude": 7463.642846094319, "MAPCaptureTime": "2019_11_18_15_41_16_354", "MAPCompassHeading": { @@ -62,9 +68,11 @@ "MAPLongitude": -127.18475264566939, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000003.jpg", + "MAPFilename": "hero8_NA_000003.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000004.jpg", + "filetype": "image", "MAPAltitude": 6909.8168472111465, "MAPCaptureTime": "2019_11_18_15_41_18_354", "MAPCompassHeading": { @@ -75,9 +83,11 @@ "MAPLongitude": -126.65905680405231, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000004.jpg", + "MAPFilename": "hero8_NA_000004.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000005.jpg", + "filetype": "image", "MAPAltitude": 7212.594480737465, "MAPCaptureTime": "2019_11_18_15_41_20_354", "MAPCompassHeading": { @@ -88,9 +98,11 @@ "MAPLongitude": -126.93688762007304, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000005.jpg", + "MAPFilename": "hero8_NA_000005.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000006.jpg", + "filetype": "image", "MAPAltitude": 7274.361994963208, "MAPCaptureTime": "2019_11_18_15_41_22_354", "MAPCompassHeading": { @@ -101,7 +113,7 @@ "MAPLongitude": -126.98833423074615, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000006.jpg", + "MAPFilename": "hero8_NA_000006.jpg", }, ] @@ -140,7 +152,7 @@ def test_process_gopro_hero8( for expected_desc in expected_descs: expected_desc["filename"] = str(sample_dir.join(expected_desc["filename"])) - verify_descs(expected_descs, Path(desc_path)) + assert_same_image_descs(Path(desc_path), expected_descs) @pytest.mark.usefixtures("setup_config") diff --git a/tests/integration/test_process.py b/tests/integration/test_process.py index 82b042e2d..a3afe06f2 100644 --- a/tests/integration/test_process.py +++ b/tests/integration/test_process.py @@ -9,13 +9,13 @@ import pytest from .fixtures import ( + assert_contains_image_descs, EXECUTABLE, IS_FFMPEG_INSTALLED, run_exiftool_and_generate_geotag_args, setup_config, setup_data, validate_and_extract_zip, - verify_descs, ) @@ -25,6 +25,7 @@ "DSC00001.JPG": { "filename": "DSC00001.JPG", "filetype": "image", + "MAPFilename": "DSC00001.JPG", "MAPLatitude": 45.5169031, "MAPLongitude": -122.572765, "MAPCaptureTime": "2018_06_08_20_24_11_000", @@ -37,6 +38,7 @@ "DSC00497.JPG": { "filename": "DSC00497.JPG", "filetype": "image", + "MAPFilename": "DSC00497.JPG", "MAPLatitude": 45.5107231, "MAPLongitude": -122.5760514, "MAPCaptureTime": "2018_06_08_20_32_28_000", @@ -49,6 +51,7 @@ "V0370574.JPG": { "filename": "V0370574.JPG", "filetype": "image", + "MAPFilename": "V0370574.JPG", "MAPLatitude": -1.0169444, "MAPLongitude": -1.0169444, "MAPCaptureTime": "2018_07_27_11_32_14_000", @@ -58,7 +61,9 @@ "MAPOrientation": 1, }, "adobe_coords.jpg": { + "filename": "adobe_coords.jpg", "filetype": "image", + "MAPFilename": "adobe_coords.jpg", "MAPLatitude": -0.0702668, "MAPLongitude": 34.3819352, "MAPCaptureTime": "2019_07_16_10_26_11_000", @@ -94,23 +99,20 @@ def test_process_images_with_defaults( x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], - "filename": str(Path(setup_data, "images", "DSC00001.JPG")), }, { **_DEFAULT_EXPECTED_DESCS["DSC00497.JPG"], - "filename": str(Path(setup_data, "images", "DSC00497.JPG")), }, { **_DEFAULT_EXPECTED_DESCS["V0370574.JPG"], - "filename": str(Path(setup_data, "images", "V0370574.JPG")), "MAPCaptureTime": _local_to_utc("2018-07-27T11:32:14"), }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -124,7 +126,8 @@ def test_time_with_offset(setup_data: py.path.local, use_exiftool: bool = False) args = run_exiftool_and_generate_geotag_args(setup_data, args) x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -142,7 +145,6 @@ def test_time_with_offset(setup_data: py.path.local, use_exiftool: bool = False) "MAPCaptureTime": _local_to_utc("2018-07-27T11:32:16.500"), }, ], - Path(setup_data, "mapillary_image_description.json"), ) args = f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data} --offset_time=-1.0" @@ -150,7 +152,8 @@ def test_time_with_offset(setup_data: py.path.local, use_exiftool: bool = False) args = run_exiftool_and_generate_geotag_args(setup_data, args) x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -168,7 +171,6 @@ def test_time_with_offset(setup_data: py.path.local, use_exiftool: bool = False) "MAPCaptureTime": _local_to_utc("2018-07-27T11:32:13.000"), }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -201,9 +203,9 @@ def test_process_images_with_overwrite_all_EXIF_tags( "MAPCaptureTime": _local_to_utc("2018-07-27T11:32:16.500"), }, ] - verify_descs( - expected_descs, + assert_contains_image_descs( Path(setup_data, "mapillary_image_description.json"), + expected_descs, ) args = f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data}" @@ -211,9 +213,9 @@ def test_process_images_with_overwrite_all_EXIF_tags( args = run_exiftool_and_generate_geotag_args(setup_data, args) x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( - expected_descs, + assert_contains_image_descs( Path(setup_data, "mapillary_image_description.json"), + expected_descs, ) @@ -232,7 +234,8 @@ def test_angle_with_offset(setup_data: py.path.local, use_exiftool: bool = False x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -260,7 +263,6 @@ def test_angle_with_offset(setup_data: py.path.local, use_exiftool: bool = False }, }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -271,7 +273,8 @@ def test_angle_with_offset_with_exiftool(setup_data: py.path.local): def test_parse_adobe_coordinates(setup_data: py.path.local): args = f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data}/adobe_coords" x = subprocess.run(args, shell=True) - verify_descs( + assert_contains_image_descs( + Path(setup_data, "adobe_coords/mapillary_image_description.json"), [ { "filename": str(Path(setup_data, "adobe_coords", "adobe_coords.jpg")), @@ -285,7 +288,6 @@ def test_parse_adobe_coordinates(setup_data: py.path.local): "MAPOrientation": 1, } ], - Path(setup_data, "adobe_coords/mapillary_image_description.json"), ) @@ -385,7 +387,8 @@ def test_geotagging_images_from_gpx(setup_data: py.path.local): """, shell=True, ) - verify_descs( + assert_contains_image_descs( + Path(images, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -410,7 +413,6 @@ def test_geotagging_images_from_gpx(setup_data: py.path.local): }, }, ], - Path(images, "mapillary_image_description.json"), ) @@ -424,7 +426,8 @@ def test_geotagging_images_from_gpx_with_offset(setup_data: py.path.local): shell=True, ) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -451,7 +454,6 @@ def test_geotagging_images_from_gpx_with_offset(setup_data: py.path.local): }, }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -464,7 +466,8 @@ def test_geotagging_images_from_gpx_use_gpx_start_time(setup_data: py.path.local shell=True, ) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -491,7 +494,6 @@ def test_geotagging_images_from_gpx_use_gpx_start_time(setup_data: py.path.local }, }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -506,7 +508,8 @@ def test_geotagging_images_from_gpx_use_gpx_start_time_with_offset( shell=True, ) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -533,7 +536,6 @@ def test_geotagging_images_from_gpx_use_gpx_start_time_with_offset( }, }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -711,7 +713,8 @@ def test_video_process_sample_with_distance(setup_data: py.path.local): shell=True, ) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + desc_path, [ { "filename": str( @@ -780,7 +783,6 @@ def test_video_process_sample_with_distance(setup_data: py.path.local): "MAPOrientation": 1, }, ], - desc_path, ) diff --git a/tests/integration/test_process_and_upload.py b/tests/integration/test_process_and_upload.py index a75fe502d..8b244eb04 100644 --- a/tests/integration/test_process_and_upload.py +++ b/tests/integration/test_process_and_upload.py @@ -1,5 +1,4 @@ import datetime -import os import subprocess from pathlib import Path @@ -7,14 +6,15 @@ import pytest from .fixtures import ( + assert_contains_image_descs, + assert_same_image_descs, EXECUTABLE, + extract_all_uploaded_descs, IS_FFMPEG_INSTALLED, setup_config, setup_data, setup_upload, USERNAME, - validate_and_extract_camm, - validate_and_extract_zip, ) PROCESS_FLAGS = "" @@ -130,34 +130,6 @@ } -def _validate_uploads(upload_dir: py.path.local, expected): - descs = [] - for file in upload_dir.listdir(): - if str(file).endswith(".mp4"): - descs.extend(validate_and_extract_camm(str(file))) - elif str(file).endswith(".zip"): - descs.extend(validate_and_extract_zip(Path(file))) - else: - raise Exception(f"invalid file {file}") - - excludes = [ - "filename", - "filesize", - "md5sum", - "MAPMetaTags", - "MAPSequenceUUID", - "MAPFilename", - ] - - actual = {} - for desc in descs: - actual[os.path.basename(desc["MAPFilename"])] = { - k: v for k, v in desc.items() if k not in excludes - } - - assert expected == actual - - @pytest.mark.usefixtures("setup_config") def test_process_and_upload(setup_data: py.path.local, setup_upload: py.path.local): input_paths = [ @@ -168,17 +140,17 @@ def test_process_and_upload(setup_data: py.path.local, setup_upload: py.path.loc setup_data.join("images"), setup_data.join("images").join("DSC00001.JPG"), ] - x = subprocess.run( + subprocess.run( f"{EXECUTABLE} --verbose process_and_upload {UPLOAD_FLAGS} {' '.join(map(str, input_paths))} --skip_process_errors", shell=True, + check=True, + ) + + descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert_contains_image_descs( + descs, + [*EXPECTED_DESCS["gopro"].values(), *EXPECTED_DESCS["image"].values()], ) - assert x.returncode == 0, x.stderr - if IS_FFMPEG_INSTALLED: - _validate_uploads( - setup_upload, {**EXPECTED_DESCS["gopro"], **EXPECTED_DESCS["image"]} - ) - else: - _validate_uploads(setup_upload, {**EXPECTED_DESCS["image"]}) @pytest.mark.usefixtures("setup_config") @@ -186,7 +158,7 @@ def test_process_and_upload_images_only( setup_data: py.path.local, setup_upload: py.path.local, ): - x = subprocess.run( + subprocess.run( f"""{EXECUTABLE} --verbose process_and_upload \ {UPLOAD_FLAGS} {PROCESS_FLAGS} \ --filetypes=image \ @@ -194,9 +166,10 @@ def test_process_and_upload_images_only( {setup_data}/images {setup_data}/images {setup_data}/images/DSC00001.JPG """, shell=True, + check=True, ) - assert x.returncode == 0, x.stderr - _validate_uploads(setup_upload, EXPECTED_DESCS["image"]) + descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert_contains_image_descs(descs, [*EXPECTED_DESCS["image"].values()]) @pytest.mark.usefixtures("setup_config") @@ -210,7 +183,7 @@ def test_video_process_and_upload( gpx_start_time = "2025_03_14_07_00_00_000" gpx_end_time = "2025_03_14_07_01_33_624" gpx_file = setup_data.join("gpx").join("sf_30km_h.gpx") - x = subprocess.run( + subprocess.run( f"""{EXECUTABLE} video_process_and_upload \ {PROCESS_FLAGS} {UPLOAD_FLAGS} \ --video_sample_interval=2 \ @@ -222,11 +195,12 @@ def test_video_process_and_upload( {video_dir} {video_dir.join("my_samples")} """, shell=True, + check=True, ) - assert x.returncode == 0, x.stderr - assert 1 == len(setup_upload.listdir()) expected = { "sample-5s_NA_000001.jpg": { + "filename": "sample-5s_NA_000001.jpg", + "MAPFilename": "sample-5s_NA_000001.jpg", "MAPAltitude": 94.75, "MAPCaptureTime": "2025_03_14_07_00_00_000", "MAPCompassHeading": { @@ -239,6 +213,8 @@ def test_video_process_and_upload( "filetype": "image", }, "sample-5s_NA_000002.jpg": { + "filename": "sample-5s_NA_000002.jpg", + "MAPFilename": "sample-5s_NA_000002.jpg", "MAPAltitude": 93.347, "MAPCaptureTime": "2025_03_14_07_00_02_000", "MAPCompassHeading": { @@ -251,6 +227,8 @@ def test_video_process_and_upload( "filetype": "image", }, "sample-5s_NA_000003.jpg": { + "filename": "sample-5s_NA_000003.jpg", + "MAPFilename": "sample-5s_NA_000003.jpg", "MAPAltitude": 92.492, "MAPCaptureTime": "2025_03_14_07_00_04_000", "MAPCompassHeading": { @@ -263,7 +241,8 @@ def test_video_process_and_upload( "filetype": "image", }, } - _validate_uploads(setup_upload, expected) + descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert_same_image_descs(descs, list(expected.values())) @pytest.mark.usefixtures("setup_config") @@ -278,7 +257,7 @@ def test_video_process_and_upload_after_gpx( gpx_end_time = "2025_03_14_07_01_33_624" video_start_time = "2025_03_14_07_01_34_624" gpx_file = setup_data.join("gpx").join("sf_30km_h.gpx") - x = subprocess.run( + subprocess.run( f"""{EXECUTABLE} video_process_and_upload \ {PROCESS_FLAGS} {UPLOAD_FLAGS} \ --video_sample_interval=2 \ @@ -291,7 +270,7 @@ def test_video_process_and_upload_after_gpx( {video_dir} {video_dir.join("my_samples")} """, shell=True, + check=True, ) - assert x.returncode == 0, x.stderr - assert 0 == len(setup_upload.listdir()) - _validate_uploads(setup_upload, {}) + descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert_same_image_descs(descs, []) diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py index 753622d52..04a258b29 100644 --- a/tests/integration/test_upload.py +++ b/tests/integration/test_upload.py @@ -6,12 +6,13 @@ import pytest from .fixtures import ( + assert_contains_image_descs, EXECUTABLE, + extract_all_uploaded_descs, setup_config, setup_data, setup_upload, USERNAME, - validate_uploaded_images, ) @@ -21,53 +22,59 @@ @pytest.mark.usefixtures("setup_config") def test_upload_image_dir(setup_data: py.path.local, setup_upload: py.path.local): - x = subprocess.run( - f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data}", + subprocess.run( + f"{EXECUTABLE} process {PROCESS_FLAGS} --file_types=image {setup_data}", shell=True, + check=True, ) - desc_path = setup_data.join("mapillary_image_description.json") - with open(desc_path) as fp: - descs = json.load(fp) - for desc in descs: - # TODO: check if the descs are valid - pass - x = subprocess.run( + subprocess.run( f"{EXECUTABLE} process_and_upload {UPLOAD_FLAGS} --file_types=image {setup_data}", shell=True, + check=True, ) - assert x.returncode == 0, x.stderr - descs = validate_uploaded_images(Path(setup_upload)) - for x in descs: - # TODO: check if the descs are valid - pass + uploaded_descs: list[dict] = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert len(uploaded_descs) > 0, "No images were uploaded" + + assert_contains_image_descs( + Path(setup_data.join("mapillary_image_description.json")), + uploaded_descs, + ) @pytest.mark.usefixtures("setup_config") def test_upload_image_dir_twice(setup_data: py.path.local, setup_upload: py.path.local): - x = subprocess.run( + subprocess.run( f"{EXECUTABLE} process --skip_process_errors {PROCESS_FLAGS} {setup_data}", shell=True, + check=True, ) - assert x.returncode == 0, x.stderr desc_path = setup_data.join("mapillary_image_description.json") # first upload - x = subprocess.run( + subprocess.run( f"{EXECUTABLE} process_and_upload {UPLOAD_FLAGS} --file_types=image {setup_data}", shell=True, + check=True, + ) + first_descs = extract_all_uploaded_descs(Path(setup_upload)) + assert_contains_image_descs( + Path(desc_path), + sum(first_descs, []), ) - assert x.returncode == 0, x.stderr - validate_uploaded_images(Path(setup_upload)) # expect the second upload to not produce new uploads - x = subprocess.run( + subprocess.run( f"{EXECUTABLE} process_and_upload {UPLOAD_FLAGS} --desc_path={desc_path} --file_types=image {setup_data} {setup_data} {setup_data}/images/DSC00001.JPG", shell=True, + check=True, + ) + second_descs = extract_all_uploaded_descs(Path(setup_upload)) + assert_contains_image_descs( + Path(desc_path), + sum(second_descs, []), ) - assert x.returncode == 0, x.stderr - validate_uploaded_images(Path(setup_upload)) @pytest.mark.usefixtures("setup_config") diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 71f21c87f..a2d3a2ff0 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -1,16 +1,14 @@ -import json import os import typing as T -import zipfile from pathlib import Path import py.path import pytest -from mapillary_tools import api_v4, types, upload_api_v4, uploader, utils +from mapillary_tools import api_v4, types, uploader -from ..integration.fixtures import setup_upload, validate_and_extract_zip +from ..integration.fixtures import extract_all_uploaded_descs, setup_upload IMPORT_PATH = "tests/unit/data" @@ -26,24 +24,6 @@ def setup_unittest_data(tmpdir: py.path.local): tmpdir.remove(ignore_errors=True) -def _validate_zip_dir(zip_dir: py.path.local): - descs = [] - for zip_path in zip_dir.listdir(): - with zipfile.ZipFile(zip_path) as ziph: - filename = ziph.testzip() - assert filename is None, f"Corrupted zip {zip_path}: {filename}" - sequence_md5sum = json.loads(ziph.comment).get("sequence_md5sum") - - with open(zip_path, "rb") as fp: - upload_md5sum = utils.md5sum_fp(fp).hexdigest() - - assert str(os.path.basename(zip_path)) == f"mly_tools_{upload_md5sum}.zip", ( - zip_path - ) - descs.extend(validate_and_extract_zip(Path(zip_path))) - return descs - - def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path.local): mly_uploader = uploader.Uploader( {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, dry_run=True @@ -74,9 +54,10 @@ def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path ) ) assert len(results) == 1 - assert len(setup_upload.listdir()) == 1 - actual_descs = _validate_zip_dir(setup_upload) - assert 1 == len(actual_descs), "should return 1 desc because of the unique filename" + actual_descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert 1 == len(actual_descs), ( + f"should return 1 desc because of the unique filename but got {actual_descs}" + ) def test_upload_images_multiple_sequences( @@ -128,8 +109,7 @@ def test_upload_images_multiple_sequences( ) ) assert len(results) == 2 - assert len(setup_upload.listdir()) == 2 - actual_descs = _validate_zip_dir(setup_upload) + actual_descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) assert 2 == len(actual_descs) @@ -173,11 +153,10 @@ def test_upload_zip( }, ] zip_dir = setup_unittest_data.mkdir("zip_dir") - sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs] - uploader.ZipImageSequence.zip_images(sequence, Path(zip_dir)) + uploader.ZipImageSequence.zip_images( + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], Path(zip_dir) + ) assert len(zip_dir.listdir()) == 2, list(zip_dir.listdir()) - descs = _validate_zip_dir(zip_dir) - assert 3 == len(descs) mly_uploader = uploader.Uploader( { @@ -191,8 +170,8 @@ def test_upload_zip( for zip_path in zip_dir.listdir(): cluster = uploader.ZipImageSequence.upload_zipfile(mly_uploader, Path(zip_path)) assert cluster == "0" - descs = _validate_zip_dir(setup_upload) - assert 3 == len(descs) + actual_descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert 3 == len(actual_descs) def test_upload_blackvue( @@ -213,16 +192,15 @@ def test_upload_blackvue( with Path(blackvue_path).open("rb") as fp: file_handle = mly_uploader.upload_stream( fp, - "this_is_a_blackvue.mp4", + session_key="this_is_a_blackvue.mp4", ) cluster_id = mly_uploader.finish_upload( file_handle, api_v4.ClusterFileType.BLACKVUE ) assert cluster_id == "0" - for mp4_path in setup_upload.listdir(): - assert os.path.basename(mp4_path) == "this_is_a_blackvue.mp4" - with open(mp4_path, "rb") as fp: - assert fp.read() == b"this is a fake video" + assert setup_upload.join("this_is_a_blackvue.mp4").exists() + with open(setup_upload.join("this_is_a_blackvue.mp4"), "rb") as fp: + assert fp.read() == b"this is a fake video" def test_upload_zip_with_emitter( @@ -241,8 +219,8 @@ def _upload_start(payload): assert "test_started" not in payload payload["test_started"] = True - assert payload["upload_md5sum"] not in stats - stats[payload["upload_md5sum"]] = {**payload} + assert payload["sequence_md5sum"] not in stats + stats[payload["sequence_md5sum"]] = {**payload} @emitter.on("upload_fetch_offset") def _fetch_offset(payload): @@ -250,7 +228,7 @@ def _fetch_offset(payload): assert payload["test_started"] payload["test_fetch_offset"] = True - assert payload["upload_md5sum"] in stats + assert payload["sequence_md5sum"] in stats @emitter.on("upload_end") def _upload_end(payload): @@ -258,8 +236,8 @@ def _upload_end(payload): assert payload["test_started"] assert payload["test_fetch_offset"] - assert payload["upload_md5sum"] in stats + assert payload["sequence_md5sum"] in stats test_upload_zip(setup_unittest_data, setup_upload, emitter=emitter) - assert len(stats) == 2 + assert len(stats) == 2, stats From 75625c955dc08019f20f872df44c6ac65807d334 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 17 Jun 2025 16:22:33 -0700 Subject: [PATCH 08/15] format --- tests/unit/test_uploader.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index a2d3a2ff0..51b147d43 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -114,9 +114,7 @@ def test_upload_images_multiple_sequences( def test_upload_zip( - setup_unittest_data: py.path.local, - setup_upload: py.path.local, - emitter=None, + setup_unittest_data: py.path.local, setup_upload: py.path.local, emitter=None ): test_exif = setup_unittest_data.join("test_exif.jpg") setup_unittest_data.join("another_directory").mkdir() @@ -174,10 +172,7 @@ def test_upload_zip( assert 3 == len(actual_descs) -def test_upload_blackvue( - tmpdir: py.path.local, - setup_upload: py.path.local, -): +def test_upload_blackvue(tmpdir: py.path.local, setup_upload: py.path.local): mly_uploader = uploader.Uploader( { "user_upload_token": "YOUR_USER_ACCESS_TOKEN", @@ -204,9 +199,7 @@ def test_upload_blackvue( def test_upload_zip_with_emitter( - setup_unittest_data: py.path.local, - tmpdir: py.path.local, - setup_upload: py.path.local, + setup_unittest_data: py.path.local, setup_upload: py.path.local ): emitter = uploader.EventEmitter() From 11161c3581c461c669eed0303c3e55461a06baec Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 17 Jun 2025 16:23:08 -0700 Subject: [PATCH 09/15] format --- tests/unit/test_uploader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 51b147d43..f9367d900 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -1,4 +1,3 @@ -import os import typing as T from pathlib import Path From ed3f07de38e0c34dbed9124bfa811e30cbdd1eb0 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 17 Jun 2025 18:42:22 -0700 Subject: [PATCH 10/15] parallel uploading --- mapillary_tools/uploader.py | 39 ++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index d153616ce..fa7603ac6 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -1,5 +1,7 @@ from __future__ import annotations +import concurrent.futures + import dataclasses import io import json @@ -7,6 +9,7 @@ import os import struct import tempfile +import threading import time import typing as T import uuid @@ -359,31 +362,39 @@ def _upload_sequence( if progress is None: progress = {} - _validate_metadatas(sequence) - - # TODO: assert sequence is sorted - - # FIXME: This is a hack to disable the event emitter inside the uploader - uploader.emittion_disabled = True + lock = threading.Lock() + total_image_bytes = 0 - uploader.emitter.emit("upload_start", progress) + def _upload_image(image_metadata: types.ImageMetadata) -> str: + nonlocal total_image_bytes - image_file_handles: list[str] = [] - total_bytes = 0 - for image_metadata in sequence: mutable_progress: dict[str, T.Any] = { **progress, "filename": str(image_metadata.filename), } bytes = cls._dump_image_bytes(image_metadata) - total_bytes += len(bytes) file_handle = uploader.upload_stream( io.BytesIO(bytes), progress=mutable_progress ) - image_file_handles.append(file_handle) - uploader.emitter.emit("upload_progress", mutable_progress) + with lock: + uploader.emitter.emit("upload_progress", mutable_progress) + total_image_bytes += len(bytes) + + return file_handle + + _validate_metadatas(sequence) + + # TODO: assert sequence is sorted + + # FIXME: This is a hack to disable the event emitter inside the uploader + uploader.emittion_disabled = True + + uploader.emitter.emit("upload_start", progress) + + with concurrent.futures.ThreadPoolExecutor() as executor: + image_file_handles = list(executor.map(_upload_image, sequence)) manifest = { "version": "1", @@ -398,7 +409,7 @@ def _upload_sequence( manifest_fp, session_key=f"{uuid.uuid4().hex}.json" ) - progress["entity_size"] = total_bytes + progress["entity_size"] = total_image_bytes uploader.emitter.emit("upload_end", progress) # FIXME: This is a hack to disable the event emitter inside the uploader From e337c9a8253886740b478f2be56e91cee7529365 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 17 Jun 2025 23:16:02 -0700 Subject: [PATCH 11/15] fix types --- mapillary_tools/uploader.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index fa7603ac6..87945860b 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -641,7 +641,9 @@ def finish_upload( return cluster_id - def _maybe_emit(self, event: EventName, progress: dict[str, T.Any]): + def _maybe_emit( + self, event: EventName, progress: dict[str, T.Any] | UploaderProgress + ): if not self.emittion_disabled: return self.emitter.emit(event, progress) From 7f59cab7dc96c726b1e4237baff85fd3616e80a1 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 17 Jun 2025 23:49:36 -0700 Subject: [PATCH 12/15] types --- mapillary_tools/types.py | 2 +- mapillary_tools/uploader.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index 747b421b7..85c57fc14 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -144,7 +144,7 @@ class UserItem(TypedDict, total=False): # Not in use. Keep here for back-compatibility MAPSettingsUsername: str MAPSettingsUserKey: str - user_upload_token: str + user_upload_token: T.Required[str] class _CompassHeading(TypedDict, total=True): diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 87945860b..9479103e3 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -368,7 +368,7 @@ def _upload_sequence( def _upload_image(image_metadata: types.ImageMetadata) -> str: nonlocal total_image_bytes - mutable_progress: dict[str, T.Any] = { + mutable_progress = { **progress, "filename": str(image_metadata.filename), } From a23e4194c86674738c8bcf9f9733129c8ac83e28 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 18 Jun 2025 11:11:23 -0700 Subject: [PATCH 13/15] fix progresses --- mapillary_tools/types.py | 62 ++++++++++++++++--------------------- mapillary_tools/upload.py | 45 ++------------------------- mapillary_tools/uploader.py | 11 +++---- 3 files changed, 33 insertions(+), 85 deletions(-) diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index 85c57fc14..fa886518a 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -6,11 +6,17 @@ import hashlib import json import os +import sys import typing as T import uuid from pathlib import Path from typing import Literal, TypedDict +if sys.version_info >= (3, 12): + from typing import NotRequired, Required +else: + from typing_extensions import NotRequired, Required + import jsonschema from . import exceptions, geo, utils @@ -144,7 +150,7 @@ class UserItem(TypedDict, total=False): # Not in use. Keep here for back-compatibility MAPSettingsUsername: str MAPSettingsUserKey: str - user_upload_token: T.Required[str] + user_upload_token: Required[str] class _CompassHeading(TypedDict, total=True): @@ -152,65 +158,49 @@ class _CompassHeading(TypedDict, total=True): MagneticHeading: float -class _ImageRequired(TypedDict, total=True): - MAPLatitude: float - MAPLongitude: float - MAPCaptureTime: str +class _SharedDescription(TypedDict, total=False): + filename: Required[str] + filetype: Required[str] + + # if None or absent, it will be calculated + md5sum: str | None + filesize: int | None -class _Image(_ImageRequired, total=False): +class ImageDescription(_SharedDescription, total=False): + MAPLatitude: Required[float] + MAPLongitude: Required[float] MAPAltitude: float + MAPCaptureTime: Required[str] MAPCompassHeading: _CompassHeading - -class _SequenceOnly(TypedDict, total=False): - MAPSequenceUUID: str - - -class MetaProperties(TypedDict, total=False): MAPDeviceMake: str MAPDeviceModel: str MAPGPSAccuracyMeters: float MAPCameraUUID: str MAPOrientation: int - -class ImageDescription(_SequenceOnly, _Image, MetaProperties, total=True): - # filename is required - filename: str - # if None or absent, it will be calculated - md5sum: str | None - filetype: Literal["image"] - filesize: int | None - - -class _VideoDescriptionRequired(TypedDict, total=True): - filename: str - md5sum: str | None - filetype: str - MAPGPSTrack: list[T.Sequence[float | int | None]] + # For grouping images in a sequence + MAPSequenceUUID: str -class VideoDescription(_VideoDescriptionRequired, total=False): +class VideoDescription(_SharedDescription, total=False): + MAPGPSTrack: Required[list[T.Sequence[float | int | None]]] MAPDeviceMake: str MAPDeviceModel: str - filesize: int | None class _ErrorDescription(TypedDict, total=False): # type and message are required - type: str + type: Required[str] message: str # vars is optional vars: dict -class _ImageDescriptionErrorRequired(TypedDict, total=True): - filename: str - error: _ErrorDescription - - -class ImageDescriptionError(_ImageDescriptionErrorRequired, total=False): +class ImageDescriptionError(TypedDict, total=False): + filename: Required[str] + error: Required[_ErrorDescription] filetype: str diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index ae28205b6..6387d77db 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -192,49 +192,10 @@ def _setup_tdqm(emitter: uploader.EventEmitter) -> None: upload_pbar: tqdm | None = None @emitter.on("upload_start") - def upload_start_for_images(payload: uploader.Progress) -> None: - nonlocal upload_pbar - - if payload.get("file_type") != FileType.IMAGE.value: - return - - if upload_pbar is not None: - upload_pbar.close() - - nth = payload["sequence_idx"] + 1 - total = payload["total_sequence_count"] - filetype = payload.get("file_type", "unknown").upper() - import_path: str | None = payload.get("import_path") - if import_path is None: - desc = f"Uploading {filetype} ({nth}/{total})" - else: - desc = ( - f"Uploading {filetype} {os.path.basename(import_path)} ({nth}/{total})" - ) - upload_pbar = tqdm( - total=payload["sequence_image_count"], - desc=desc, - unit="B", - unit_scale=True, - unit_divisor=1024, - initial=0, - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - ) - - @emitter.on("upload_progress") - def upload_progress_for_images(payload: uploader.Progress) -> None: - if payload.get("file_type") != FileType.IMAGE.value: - return - assert upload_pbar is not None, "progress_bar must be initialized" - upload_pbar.update() - @emitter.on("upload_fetch_offset") - def upload_fetch_offset(payload: uploader.Progress) -> None: + def upload_start(payload: uploader.Progress) -> None: nonlocal upload_pbar - if payload.get("file_type") == FileType.IMAGE.value: - return - if upload_pbar is not None: upload_pbar.close() @@ -254,14 +215,12 @@ def upload_fetch_offset(payload: uploader.Progress) -> None: unit="B", unit_scale=True, unit_divisor=1024, - initial=payload["offset"], + initial=payload.get("offset", 0), disable=LOG.getEffectiveLevel() <= logging.DEBUG, ) @emitter.on("upload_progress") def upload_progress(payload: uploader.Progress) -> None: - if payload.get("file_type") == FileType.IMAGE.value: - return assert upload_pbar is not None, "progress_bar must be initialized" upload_pbar.update(payload["chunk_size"]) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 9479103e3..c68756271 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -363,13 +363,10 @@ def _upload_sequence( progress = {} lock = threading.Lock() - total_image_bytes = 0 def _upload_image(image_metadata: types.ImageMetadata) -> str: - nonlocal total_image_bytes - mutable_progress = { - **progress, + **(progress or {}), "filename": str(image_metadata.filename), } @@ -378,14 +375,17 @@ def _upload_image(image_metadata: types.ImageMetadata) -> str: io.BytesIO(bytes), progress=mutable_progress ) + mutable_progress["chunk_size"] = image_metadata.filesize + with lock: uploader.emitter.emit("upload_progress", mutable_progress) - total_image_bytes += len(bytes) return file_handle _validate_metadatas(sequence) + progress["entity_size"] = sum(m.filesize or 0 for m in sequence) + # TODO: assert sequence is sorted # FIXME: This is a hack to disable the event emitter inside the uploader @@ -409,7 +409,6 @@ def _upload_image(image_metadata: types.ImageMetadata) -> str: manifest_fp, session_key=f"{uuid.uuid4().hex}.json" ) - progress["entity_size"] = total_image_bytes uploader.emitter.emit("upload_end", progress) # FIXME: This is a hack to disable the event emitter inside the uploader From b6047e9280e645a98e7bcf0fc9597cdbf089815f Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 18 Jun 2025 11:14:13 -0700 Subject: [PATCH 14/15] ruff --- mapillary_tools/types.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index fa886518a..b4fe31615 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -10,12 +10,12 @@ import typing as T import uuid from pathlib import Path -from typing import Literal, TypedDict +from typing import TypedDict -if sys.version_info >= (3, 12): - from typing import NotRequired, Required +if sys.version_info >= (3, 11): + from typing import Required else: - from typing_extensions import NotRequired, Required + from typing_extensions import Required import jsonschema From 47a84fa04cd55b219646992515acf554f9d6224f Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 18 Jun 2025 14:31:21 -0700 Subject: [PATCH 15/15] fix max number of upload workers --- mapillary_tools/constants.py | 2 ++ mapillary_tools/uploader.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index c37e61003..fc1fc4007 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -89,3 +89,5 @@ def _yes_or_no(val: str) -> bool: "upload_history", ), ) + +MAX_IMAGE_UPLOAD_WORKERS = int(os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64)) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index c68756271..4fba04f20 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -393,7 +393,9 @@ def _upload_image(image_metadata: types.ImageMetadata) -> str: uploader.emitter.emit("upload_start", progress) - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=constants.MAX_IMAGE_UPLOAD_WORKERS + ) as executor: image_file_handles = list(executor.map(_upload_image, sequence)) manifest = {