From d6b7eee2765f1b57b2fc21383debe9419a940a0b Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 17 Mar 2025 15:13:54 -0700 Subject: [PATCH 1/6] group functions in ZipFileSequence --- mapillary_tools/upload.py | 2 +- mapillary_tools/uploader.py | 221 +++++++++++++++++++++---------- tests/unit/test_upload_api_v4.py | 18 ++- tests/unit/test_uploader.py | 64 ++++++++- 4 files changed, 227 insertions(+), 78 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 0bb19ce17..98b031d72 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -136,7 +136,7 @@ def zip_images( metadata for metadata in metadatas if isinstance(metadata, types.ImageMetadata) ] - uploader.zip_images(image_metadatas, zip_dir) + uploader.ZipFileSequence.zip_sequence(image_metadatas, zip_dir) def fetch_user_items( diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 19ed34992..38fba6d87 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -1,3 +1,4 @@ +from collections.abc import Buffer import io import json import logging @@ -88,6 +89,144 @@ def emit(self, event: EventName, *args, **kwargs): callback(*args, **kwargs) +class MyBytesIO(io.BufferedIOBase): + def __init__(self) -> None: + self._tell = 0 + self._buffer = io.BytesIO() + + def write(self, data: bytes | Buffer) -> int: + n = self._buffer.write(data) + self._tell += n + return n + + def pop(self) -> bytes: + data = self._buffer.getvalue() + self._buffer = io.BytesIO() + return data + + def flush(self): + self._buffer = io.BytesIO() + + def tell(self) -> int: + return self._tell + + +class ZipFileSequence: + @classmethod + def generate_zip_chunks( + cls, + sequence: T.Sequence[types.ImageMetadata], + upload_md5sum: str, + ) -> T.Generator[bytes, None, None]: + _sequences = types.group_and_sort_images(sequence) + assert len(_sequences) == 1, ( + f"Only one sequence is allowed but got {len(_sequences)}: {list(_sequences.keys())}" + ) + + zip_fp = MyBytesIO() + with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: + arcnames: T.Set[str] = set() + for metadata in sequence: + cls._write_imagebytes_in_zip(zipf, metadata, arcnames=arcnames) + yield zip_fp.pop() + zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") + yield zip_fp.pop() + yield zip_fp.pop() + + @classmethod + def zip_images( + cls, metadatas: T.Sequence[types.ImageMetadata], zip_dir: Path + ) -> None: + """ + Group images into sequences and zip each sequence into a zipfile. + """ + _validate_metadatas(metadatas) + sequences = types.group_and_sort_images(metadatas) + os.makedirs(zip_dir, exist_ok=True) + for sequence_uuid, sequence in sequences.items(): + # md5sum + for metadata in sequence: + metadata.update_md5sum() + upload_md5sum = types.sequence_md5sum(sequence) + + # For atomicity we write into a WIP file and then rename to the final file + wip_zip_filename = zip_dir.joinpath( + f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}" + ) + zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip") + with wip_file_context(wip_zip_filename, zip_filename) as wip_path: + with wip_path.open("wb") as wip_fp: + cls.zip_sequence_fp(sequence, wip_fp, upload_md5sum) + + @classmethod + def zip_sequence_fp( + cls, + sequence: T.Sequence[types.ImageMetadata], + zip_fp: T.IO[bytes], + upload_md5sum: str, + ) -> None: + _sequences = types.group_and_sort_images(sequence) + assert len(_sequences) == 1, ( + f"Only one sequence is allowed but got {len(_sequences)}: {list(_sequences.keys())}" + ) + + with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: + arcnames: T.Set[str] = set() + for metadata in sequence: + cls._write_imagebytes_in_zip(zipf, metadata, arcnames=arcnames) + assert len(sequence) == len(set(zipf.namelist())) + zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") + + @classmethod + def _uniq_arcname(cls, filename: Path, arcnames: T.Set[str]): + arcname: str = filename.name + + # make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones + arcname_idx = 0 + while arcname in arcnames: + arcname_idx += 1 + arcname = f"{filename.stem}_{arcname_idx}{filename.suffix}" + + return arcname + + @classmethod + def _write_imagebytes_in_zip( + cls, + zipf: zipfile.ZipFile, + metadata: types.ImageMetadata, + arcnames: T.Optional[T.Set[str]] = None, + ): + if arcnames is None: + arcnames = set() + + edit = exif_write.ExifEdit(metadata.filename) + # The cast is to fix the type checker error + edit.add_image_description( + T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata))) + ) + image_bytes = edit.dump_image_bytes() + + arcname = cls._uniq_arcname(metadata.filename, arcnames) + arcnames.add(arcname) + + zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) + zipf.writestr(zipinfo, image_bytes) + + @classmethod + def _extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> T.Optional[str]: + with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph: + comment = ziph.comment + if not comment: + return None + try: + upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum") + except Exception: + return None + if not upload_md5sum: + return None + return str(upload_md5sum) + + class Uploader: def __init__( self, @@ -121,16 +260,16 @@ def upload_zipfile( "sequence_image_count": len(namelist), } - with zip_path.open("rb") as fp: - upload_md5sum = _extract_upload_md5sum(fp) + with zip_path.open("rb") as zip_fp: + upload_md5sum = ZipFileSequence._extract_upload_md5sum(zip_fp) if upload_md5sum is None: - with zip_path.open("rb") as fp: - upload_md5sum = utils.md5sum_fp(fp).hexdigest() + with zip_path.open("rb") as zip_fp: + upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() - with zip_path.open("rb") as fp: + with zip_path.open("rb") as zip_fp: return self.upload_stream( - fp, + zip_fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, event_payload=final_event_payload, @@ -159,7 +298,7 @@ def upload_images( metadata.update_md5sum() upload_md5sum = types.sequence_md5sum(sequence) with tempfile.NamedTemporaryFile() as fp: - _zip_sequence_fp(sequence, fp, upload_md5sum) + ZipFileSequence.zip_sequence_fp(sequence, fp, upload_md5sum) cluster_id = self.upload_stream( fp, upload_api_v4.ClusterFileType.ZIP, @@ -216,7 +355,7 @@ def upload_stream( } try: - return _upload_stream( + return _upload_stream_with_retries( upload_service, fp, event_payload=final_event_payload, @@ -254,70 +393,6 @@ def wip_file_context(wip_path: Path, done_path: Path): pass -def zip_images( - metadatas: T.List[types.ImageMetadata], - zip_dir: Path, -) -> None: - _validate_metadatas(metadatas) - sequences = types.group_and_sort_images(metadatas) - os.makedirs(zip_dir, exist_ok=True) - for sequence_uuid, sequence in sequences.items(): - for metadata in sequence: - metadata.update_md5sum() - upload_md5sum = types.sequence_md5sum(sequence) - timestamp = int(time.time()) - wip_zip_filename = zip_dir.joinpath( - f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{timestamp}" - ) - zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip") - with wip_file_context(wip_zip_filename, zip_filename) as wip_dir: - with wip_dir.open("wb") as fp: - _zip_sequence_fp(sequence, fp, upload_md5sum) - - -def _zip_sequence_fp( - sequence: T.Sequence[types.ImageMetadata], - fp: T.IO[bytes], - upload_md5sum: str, -) -> None: - arcname_idx = 0 - arcnames = set() - with zipfile.ZipFile(fp, "w", zipfile.ZIP_DEFLATED) as ziph: - for metadata in sequence: - edit = exif_write.ExifEdit(metadata.filename) - # The cast is to fix the type checker error - edit.add_image_description( - T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata))) - ) - image_bytes = edit.dump_image_bytes() - arcname: str = metadata.filename.name - # make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones - while arcname in arcnames: - arcname_idx += 1 - arcname = ( - f"{metadata.filename.stem}_{arcname_idx}{metadata.filename.suffix}" - ) - arcnames.add(arcname) - zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) - ziph.writestr(zipinfo, image_bytes) - ziph.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") - assert len(sequence) == len(set(ziph.namelist())) - - -def _extract_upload_md5sum(fp: T.IO[bytes]) -> T.Optional[str]: - with zipfile.ZipFile(fp, "r", zipfile.ZIP_DEFLATED) as ziph: - comment = ziph.comment - if not comment: - return None - try: - upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum") - except Exception: - return None - if not upload_md5sum: - return None - return str(upload_md5sum) - - def _is_immediate_retry(ex: Exception): if ( isinstance(ex, requests.HTTPError) @@ -361,7 +436,7 @@ def _callback(chunk: bytes, _): return _callback -def _upload_stream( +def _upload_stream_with_retries( upload_service: upload_api_v4.UploadService, fp: T.IO[bytes], event_payload: T.Optional[Progress] = None, diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index 83e47ad0f..8d9bc884c 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -24,11 +24,27 @@ def test_upload(setup_upload: py.path.local): assert (setup_upload.join("FOOBAR.txt").read_binary()) == content +def test_upload_big_chunksize(setup_upload: py.path.local): + upload_service = upload_api_v4.FakeUploadService( + user_access_token="TEST", + session_key="FOOBAR.txt", + chunk_size=1000, + ) + upload_service._error_ratio = 0 + content = b"double_foobar" + cluster_id = upload_service.upload(io.BytesIO(content)) + assert isinstance(cluster_id, str), cluster_id + assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + + # reupload should not affect the file + upload_service.upload(io.BytesIO(content)) + assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + + def test_upload_chunks(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR2.txt", - chunk_size=1, ) upload_service._error_ratio = 0 diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index fd6cc304a..bd10bf52b 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -30,6 +30,9 @@ def _validate_zip_dir(zip_dir: py.path.local): descs = [] for zip_path in zip_dir.listdir(): with zipfile.ZipFile(zip_path) as ziph: + filename = ziph.testzip() + assert filename is None, f"Corrupted zip {zip_path}: {filename}" + upload_md5sum = json.loads(ziph.comment).get("upload_md5sum") assert str(os.path.basename(zip_path)) == f"mly_tools_{upload_md5sum}.zip", ( zip_path @@ -161,9 +164,8 @@ def test_upload_zip( }, ] zip_dir = setup_unittest_data.mkdir("zip_dir") - uploader.zip_images( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs], Path(zip_dir) - ) + sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs] + uploader.ZipFileSequence.zip_images(sequence, Path(zip_dir)) assert len(zip_dir.listdir()) == 2, list(zip_dir.listdir()) descs = _validate_zip_dir(zip_dir) assert 3 == len(descs) @@ -184,6 +186,62 @@ def test_upload_zip( assert 3 == len(descs) +def test_upload_zip_chunks( + setup_unittest_data: py.path.local, + setup_upload: py.path.local, + emitter=None, +): + test_exif = setup_unittest_data.join("test_exif.jpg") + setup_unittest_data.join("another_directory").mkdir() + test_exif2 = setup_unittest_data.join("another_directory").join("test_exif.jpg") + test_exif.copy(test_exif2) + + descs: T.List[types.DescriptionOrError] = [ + { + "MAPLatitude": 58.5927694, + "MAPLongitude": 16.1840944, + "MAPCaptureTime": "2021_02_13_13_24_41_140", + "filename": str(test_exif), + "md5sum": None, + "filetype": "image", + "MAPSequenceUUID": "sequence_1", + }, + { + "MAPLatitude": 54.5927694, + "MAPLongitude": 16.1840944, + "MAPCaptureTime": "2021_02_13_13_24_41_140", + "filename": str(test_exif2), + "md5sum": None, + "filetype": "image", + "MAPSequenceUUID": "sequence_1", + }, + ] + zip_dir = setup_unittest_data.mkdir("zip_dir") + sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs] + uploader.ZipFileSequence.zip_images(sequence, Path(zip_dir)) + assert len(zip_dir.listdir()) == 1, list(zip_dir.listdir()) + with zip_dir.listdir()[0].open("rb") as fp: + zip_data = fp.read() + descs = _validate_zip_dir(zip_dir) + assert 2 == len(descs) + + upload_md5sum = types.sequence_md5sum(sequence) + + chunks = uploader.ZipFileSequence.generate_zip_chunks(sequence, upload_md5sum) + + zip_bytes = b"" + for chunk in chunks: + # print("len(chunk)", len(chunk)) + zip_bytes += chunk + + # print("chunks", len(zip_bytes), len(zip_data)) + # assert zip_bytes == zip_data + import zipfile + import io + + x = zipfile.ZipFile(io.BytesIO(zip_data)).testzip() + + def test_upload_blackvue( tmpdir: py.path.local, setup_upload: py.path.local, From a1ab7609e7b4e9b53561eff3a564c05d90805bdb Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 26 Mar 2025 12:10:27 -0700 Subject: [PATCH 2/6] remove generate_zip_chunks --- mapillary_tools/uploader.py | 43 ---------------------------- tests/unit/test_uploader.py | 56 ------------------------------------- 2 files changed, 99 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 38fba6d87..297cbe3b7 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -89,50 +89,7 @@ def emit(self, event: EventName, *args, **kwargs): callback(*args, **kwargs) -class MyBytesIO(io.BufferedIOBase): - def __init__(self) -> None: - self._tell = 0 - self._buffer = io.BytesIO() - - def write(self, data: bytes | Buffer) -> int: - n = self._buffer.write(data) - self._tell += n - return n - - def pop(self) -> bytes: - data = self._buffer.getvalue() - self._buffer = io.BytesIO() - return data - - def flush(self): - self._buffer = io.BytesIO() - - def tell(self) -> int: - return self._tell - - class ZipFileSequence: - @classmethod - def generate_zip_chunks( - cls, - sequence: T.Sequence[types.ImageMetadata], - upload_md5sum: str, - ) -> T.Generator[bytes, None, None]: - _sequences = types.group_and_sort_images(sequence) - assert len(_sequences) == 1, ( - f"Only one sequence is allowed but got {len(_sequences)}: {list(_sequences.keys())}" - ) - - zip_fp = MyBytesIO() - with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: - arcnames: T.Set[str] = set() - for metadata in sequence: - cls._write_imagebytes_in_zip(zipf, metadata, arcnames=arcnames) - yield zip_fp.pop() - zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") - yield zip_fp.pop() - yield zip_fp.pop() - @classmethod def zip_images( cls, metadatas: T.Sequence[types.ImageMetadata], zip_dir: Path diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index bd10bf52b..4e8d57108 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -186,62 +186,6 @@ def test_upload_zip( assert 3 == len(descs) -def test_upload_zip_chunks( - setup_unittest_data: py.path.local, - setup_upload: py.path.local, - emitter=None, -): - test_exif = setup_unittest_data.join("test_exif.jpg") - setup_unittest_data.join("another_directory").mkdir() - test_exif2 = setup_unittest_data.join("another_directory").join("test_exif.jpg") - test_exif.copy(test_exif2) - - descs: T.List[types.DescriptionOrError] = [ - { - "MAPLatitude": 58.5927694, - "MAPLongitude": 16.1840944, - "MAPCaptureTime": "2021_02_13_13_24_41_140", - "filename": str(test_exif), - "md5sum": None, - "filetype": "image", - "MAPSequenceUUID": "sequence_1", - }, - { - "MAPLatitude": 54.5927694, - "MAPLongitude": 16.1840944, - "MAPCaptureTime": "2021_02_13_13_24_41_140", - "filename": str(test_exif2), - "md5sum": None, - "filetype": "image", - "MAPSequenceUUID": "sequence_1", - }, - ] - zip_dir = setup_unittest_data.mkdir("zip_dir") - sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs] - uploader.ZipFileSequence.zip_images(sequence, Path(zip_dir)) - assert len(zip_dir.listdir()) == 1, list(zip_dir.listdir()) - with zip_dir.listdir()[0].open("rb") as fp: - zip_data = fp.read() - descs = _validate_zip_dir(zip_dir) - assert 2 == len(descs) - - upload_md5sum = types.sequence_md5sum(sequence) - - chunks = uploader.ZipFileSequence.generate_zip_chunks(sequence, upload_md5sum) - - zip_bytes = b"" - for chunk in chunks: - # print("len(chunk)", len(chunk)) - zip_bytes += chunk - - # print("chunks", len(zip_bytes), len(zip_data)) - # assert zip_bytes == zip_data - import zipfile - import io - - x = zipfile.ZipFile(io.BytesIO(zip_data)).testzip() - - def test_upload_blackvue( tmpdir: py.path.local, setup_upload: py.path.local, From 52cf8a9a32874beeab811b29d670a73226f908c4 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 26 Mar 2025 12:23:56 -0700 Subject: [PATCH 3/6] update types --- mapillary_tools/uploader.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 297cbe3b7..c8ae89f2f 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -1,4 +1,5 @@ -from collections.abc import Buffer +from __future__ import annotations + import io import json import logging @@ -73,7 +74,7 @@ class UploadCancelled(Exception): class EventEmitter: - events: T.Dict[EventName, T.List] + events: dict[EventName, T.List] def __init__(self): self.events = {} @@ -128,14 +129,14 @@ def zip_sequence_fp( ) with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: - arcnames: T.Set[str] = set() + arcnames: set[str] = set() for metadata in sequence: cls._write_imagebytes_in_zip(zipf, metadata, arcnames=arcnames) assert len(sequence) == len(set(zipf.namelist())) zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") @classmethod - def _uniq_arcname(cls, filename: Path, arcnames: T.Set[str]): + def _uniq_arcname(cls, filename: Path, arcnames: set[str]): arcname: str = filename.name # make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones @@ -151,7 +152,7 @@ def _write_imagebytes_in_zip( cls, zipf: zipfile.ZipFile, metadata: types.ImageMetadata, - arcnames: T.Optional[T.Set[str]] = None, + arcnames: set[str] | None = None, ): if arcnames is None: arcnames = set() @@ -170,7 +171,7 @@ def _write_imagebytes_in_zip( zipf.writestr(zipinfo, image_bytes) @classmethod - def _extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> T.Optional[str]: + def _extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str | None: with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph: comment = ziph.comment if not comment: @@ -188,7 +189,7 @@ class Uploader: def __init__( self, user_items: types.UserItem, - emitter: T.Optional[EventEmitter] = None, + emitter: EventEmitter | None = None, chunk_size: int = upload_api_v4.DEFAULT_CHUNK_SIZE, dry_run=False, ): @@ -201,8 +202,8 @@ def __init__( def upload_zipfile( self, zip_path: Path, - event_payload: T.Optional[Progress] = None, - ) -> T.Optional[str]: + event_payload: Progress | None = None, + ) -> str | None: if event_payload is None: event_payload = {} @@ -235,14 +236,14 @@ def upload_zipfile( def upload_images( self, image_metadatas: T.Sequence[types.ImageMetadata], - event_payload: T.Optional[Progress] = None, - ) -> T.Dict[str, str]: + event_payload: Progress | None = None, + ) -> dict[str, str]: if event_payload is None: event_payload = {} _validate_metadatas(image_metadatas) sequences = types.group_and_sort_images(image_metadatas) - ret: T.Dict[str, str] = {} + ret: dict[str, str] = {} for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): final_event_payload: Progress = { **event_payload, # type: ignore @@ -271,15 +272,15 @@ def upload_stream( fp: T.IO[bytes], cluster_filetype: upload_api_v4.ClusterFileType, upload_md5sum: str, - event_payload: T.Optional[Progress] = None, - ) -> T.Optional[str]: + event_payload: Progress | None = None, + ) -> str | None: if event_payload is None: event_payload = {} fp.seek(0, io.SEEK_END) entity_size = fp.tell() - SUFFIX_MAP: T.Dict[upload_api_v4.ClusterFileType, str] = { + SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { upload_api_v4.ClusterFileType.ZIP: ".zip", upload_api_v4.ClusterFileType.CAMM: ".mp4", upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", @@ -396,8 +397,8 @@ def _callback(chunk: bytes, _): def _upload_stream_with_retries( upload_service: upload_api_v4.UploadService, fp: T.IO[bytes], - event_payload: T.Optional[Progress] = None, - emitter: T.Optional[EventEmitter] = None, + event_payload: Progress | None = None, + emitter: EventEmitter | None = None, ) -> str: retries = 0 @@ -416,7 +417,7 @@ def _reset_retries(_, __): while True: fp.seek(0, io.SEEK_SET) - begin_offset: T.Optional[int] = None + begin_offset: int | None = None try: begin_offset = upload_service.fetch_offset() upload_service.callbacks = [_reset_retries] From 66fbb5cf895db33a26f1d34966db5625ddb63e57 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 26 Mar 2025 14:19:39 -0700 Subject: [PATCH 4/6] fix --- mapillary_tools/types.py | 3 ++- mapillary_tools/upload.py | 2 +- mapillary_tools/uploader.py | 30 ++++++++++++++---------------- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index b886bf3ec..8b1a56b5d 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -729,9 +729,10 @@ def group_and_sort_images( return sorted_sequences_by_uuid -def sequence_md5sum(sequence: T.Iterable[ImageMetadata]) -> str: +def update_sequence_md5sum(sequence: T.Iterable[ImageMetadata]) -> str: md5 = hashlib.md5() for metadata in sequence: + metadata.update_md5sum() assert isinstance(metadata.md5sum, str), "md5sum should be calculated" md5.update(metadata.md5sum.encode("utf-8")) return md5.hexdigest() diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 98b031d72..d2ae1c74b 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -136,7 +136,7 @@ def zip_images( metadata for metadata in metadatas if isinstance(metadata, types.ImageMetadata) ] - uploader.ZipFileSequence.zip_sequence(image_metadatas, zip_dir) + uploader.ZipFileSequence.zip_images(image_metadatas, zip_dir) def fetch_user_items( diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index c8ae89f2f..e5d09905a 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -101,33 +101,32 @@ def zip_images( _validate_metadatas(metadatas) sequences = types.group_and_sort_images(metadatas) os.makedirs(zip_dir, exist_ok=True) - for sequence_uuid, sequence in sequences.items(): - # md5sum - for metadata in sequence: - metadata.update_md5sum() - upload_md5sum = types.sequence_md5sum(sequence) + for sequence_uuid, sequence in sequences.items(): # For atomicity we write into a WIP file and then rename to the final file wip_zip_filename = zip_dir.joinpath( f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}" ) + upload_md5sum = types.update_sequence_md5sum(sequence) zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip") with wip_file_context(wip_zip_filename, zip_filename) as wip_path: with wip_path.open("wb") as wip_fp: - cls.zip_sequence_fp(sequence, wip_fp, upload_md5sum) + actual_md5sum = cls._zip_sequence_fp(sequence, wip_fp) + assert actual_md5sum == upload_md5sum @classmethod - def zip_sequence_fp( + def _zip_sequence_fp( cls, sequence: T.Sequence[types.ImageMetadata], zip_fp: T.IO[bytes], - upload_md5sum: str, - ) -> None: - _sequences = types.group_and_sort_images(sequence) - assert len(_sequences) == 1, ( - f"Only one sequence is allowed but got {len(_sequences)}: {list(_sequences.keys())}" + ) -> str: + sequence_groups = types.group_and_sort_images(sequence) + assert len(sequence_groups) == 1, ( + f"Only one sequence is allowed but got {len(sequence_groups)}: {list(sequence_groups.keys())}" ) + upload_md5sum = types.update_sequence_md5sum(sequence) + with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: arcnames: set[str] = set() for metadata in sequence: @@ -135,6 +134,8 @@ def zip_sequence_fp( assert len(sequence) == len(set(zipf.namelist())) zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") + return upload_md5sum + @classmethod def _uniq_arcname(cls, filename: Path, arcnames: set[str]): arcname: str = filename.name @@ -252,11 +253,8 @@ def upload_images( "sequence_image_count": len(sequence), "sequence_uuid": sequence_uuid, } - for metadata in sequence: - metadata.update_md5sum() - upload_md5sum = types.sequence_md5sum(sequence) with tempfile.NamedTemporaryFile() as fp: - ZipFileSequence.zip_sequence_fp(sequence, fp, upload_md5sum) + upload_md5sum = ZipFileSequence._zip_sequence_fp(sequence, fp) cluster_id = self.upload_stream( fp, upload_api_v4.ClusterFileType.ZIP, From ca895f4f1a11eea74b9b3900278038c98caf3e5c Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 26 Mar 2025 14:28:56 -0700 Subject: [PATCH 5/6] move around --- mapillary_tools/upload.py | 2 +- mapillary_tools/uploader.py | 38 ++++++++++++++++++------------------- tests/unit/test_uploader.py | 2 +- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index d2ae1c74b..a3969918e 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -136,7 +136,7 @@ def zip_images( metadata for metadata in metadatas if isinstance(metadata, types.ImageMetadata) ] - uploader.ZipFileSequence.zip_images(image_metadatas, zip_dir) + uploader.ZipImageSequence.zip_images(image_metadatas, zip_dir) def fetch_user_items( diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index e5d09905a..be2e899e0 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -90,7 +90,7 @@ def emit(self, event: EventName, *args, **kwargs): callback(*args, **kwargs) -class ZipFileSequence: +class ZipImageSequence: @classmethod def zip_images( cls, metadatas: T.Sequence[types.ImageMetadata], zip_dir: Path @@ -111,11 +111,11 @@ def zip_images( zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip") with wip_file_context(wip_zip_filename, zip_filename) as wip_path: with wip_path.open("wb") as wip_fp: - actual_md5sum = cls._zip_sequence_fp(sequence, wip_fp) + actual_md5sum = cls.zip_sequence_fp(sequence, wip_fp) assert actual_md5sum == upload_md5sum @classmethod - def _zip_sequence_fp( + def zip_sequence_fp( cls, sequence: T.Sequence[types.ImageMetadata], zip_fp: T.IO[bytes], @@ -136,6 +136,20 @@ def _zip_sequence_fp( return upload_md5sum + @classmethod + def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str | None: + with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph: + comment = ziph.comment + if not comment: + return None + try: + upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum") + except Exception: + return None + if not upload_md5sum: + return None + return str(upload_md5sum) + @classmethod def _uniq_arcname(cls, filename: Path, arcnames: set[str]): arcname: str = filename.name @@ -171,20 +185,6 @@ def _write_imagebytes_in_zip( zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) zipf.writestr(zipinfo, image_bytes) - @classmethod - def _extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str | None: - with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph: - comment = ziph.comment - if not comment: - return None - try: - upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum") - except Exception: - return None - if not upload_md5sum: - return None - return str(upload_md5sum) - class Uploader: def __init__( @@ -220,7 +220,7 @@ def upload_zipfile( } with zip_path.open("rb") as zip_fp: - upload_md5sum = ZipFileSequence._extract_upload_md5sum(zip_fp) + upload_md5sum = ZipImageSequence.extract_upload_md5sum(zip_fp) if upload_md5sum is None: with zip_path.open("rb") as zip_fp: @@ -254,7 +254,7 @@ def upload_images( "sequence_uuid": sequence_uuid, } with tempfile.NamedTemporaryFile() as fp: - upload_md5sum = ZipFileSequence._zip_sequence_fp(sequence, fp) + upload_md5sum = ZipImageSequence.zip_sequence_fp(sequence, fp) cluster_id = self.upload_stream( fp, upload_api_v4.ClusterFileType.ZIP, diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 4e8d57108..da1aed0e8 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -165,7 +165,7 @@ def test_upload_zip( ] zip_dir = setup_unittest_data.mkdir("zip_dir") sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs] - uploader.ZipFileSequence.zip_images(sequence, Path(zip_dir)) + uploader.ZipImageSequence.zip_images(sequence, Path(zip_dir)) assert len(zip_dir.listdir()) == 2, list(zip_dir.listdir()) descs = _validate_zip_dir(zip_dir) assert 3 == len(descs) From f8cc4085d60d9b22660d323a481ab7d91482722b Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Wed, 26 Mar 2025 14:36:11 -0700 Subject: [PATCH 6/6] comments --- mapillary_tools/uploader.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index be2e899e0..c9ae220d6 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -120,6 +120,10 @@ def zip_sequence_fp( sequence: T.Sequence[types.ImageMetadata], zip_fp: T.IO[bytes], ) -> str: + """ + Write a sequence of ImageMetadata into the zipfile handle. + The sequence has to be one sequence and sorted. + """ sequence_groups = types.group_and_sort_images(sequence) assert len(sequence_groups) == 1, ( f"Only one sequence is allowed but got {len(sequence_groups)}: {list(sequence_groups.keys())}"