From 5ed2a6e96af27a43a6b96a8704b03146f3c35d20 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Sun, 23 Feb 2025 13:03:08 +0700 Subject: [PATCH 1/3] fix: enforce camera info to be extracted from video itself instead of gpx --- .../extractors/camm_parser.py | 20 ++++++++----------- .../extractors/gpx_parser.py | 6 ++++-- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/mapillary_tools/video_data_extraction/extractors/camm_parser.py b/mapillary_tools/video_data_extraction/extractors/camm_parser.py index 79cff2bf0..4faa2c266 100644 --- a/mapillary_tools/video_data_extraction/extractors/camm_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/camm_parser.py @@ -13,8 +13,12 @@ class CammParser(BaseParser): parser_label = "camm" @functools.cached_property - def __camera_info(self) -> T.Tuple[str, str]: - with self.videoPath.open("rb") as fp: + def _camera_info(self) -> T.Tuple[str, str]: + source_path = self.geotag_source_path + if not source_path: + return "", "" + + with source_path.open("rb") as fp: return camm_parser.extract_camera_make_and_model(fp) def extract_points(self) -> T.Sequence[geo.Point]: @@ -28,15 +32,7 @@ def extract_points(self) -> T.Sequence[geo.Point]: return [] def extract_make(self) -> T.Optional[str]: - source_path = self.geotag_source_path - if not source_path: - return None - with source_path.open("rb") as _fp: - return self.__camera_info[0] or None + return self._camera_info[0] or None def extract_model(self) -> T.Optional[str]: - source_path = self.geotag_source_path - if not source_path: - return None - with source_path.open("rb") as _fp: - return self.__camera_info[1] or None + return self._camera_info[1] or None diff --git a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py index ce3287cd9..293184b02 100644 --- a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py @@ -69,9 +69,11 @@ def extract_points(self) -> T.Sequence[geo.Point]: return gpx_points def extract_make(self) -> T.Optional[str]: - parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions) + # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself + parser = GenericVideoParser(self.videoPath, self.options, {}) return parser.extract_make() def extract_model(self) -> T.Optional[str]: - parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions) + # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself + parser = GenericVideoParser(self.videoPath, self.options, {}) return parser.extract_model() From 4a0d3007b9498d636b225d1a4647c99557f6b317 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Sun, 23 Feb 2025 13:37:06 +0700 Subject: [PATCH 2/3] refactor --- .../extractors/gpx_parser.py | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py index 293184b02..4b7c87f47 100644 --- a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py @@ -38,6 +38,13 @@ def extract_points(self) -> T.Sequence[geo.Point]: if not gpx_points: return gpx_points + offset = self._synx_gpx_by_first_timestamp(gpx_points) + + self._rebase_times(gpx_points, offset=offset) + + return gpx_points + + def _synx_gpx_by_first_timestamp(self, gpx_points: T.Sequence[geo.Point]) -> float: first_gpx_dt = datetime.datetime.fromtimestamp( gpx_points[0].time, tz=datetime.timezone.utc ) @@ -45,8 +52,11 @@ def extract_points(self) -> T.Sequence[geo.Point]: # Extract first GPS timestamp (if found) for synchronization offset: float = 0.0 - parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions) + + # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself + parser = GenericVideoParser(self.videoPath, self.options, {}) gps_points = parser.extract_points() + if gps_points: first_gps_point = gps_points[0] if isinstance(first_gps_point, telemetry.GPSPoint): @@ -63,10 +73,13 @@ def extract_points(self) -> T.Sequence[geo.Point]: first_gps_dt, offset, ) + else: + LOG.warning( + "Skip GPX sync because no GPS found in video %s", + self.videoPath, + ) - self._rebase_times(gpx_points, offset=offset) - - return gpx_points + return offset def extract_make(self) -> T.Optional[str]: # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself From ebf20d13e3ce419ecc42cb667c0a557cca5e4126 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Sun, 23 Feb 2025 14:13:07 +0700 Subject: [PATCH 3/3] refactor and logging --- .../extractors/gpx_parser.py | 60 ++++++++++++------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py index 4b7c87f47..60e23c1bf 100644 --- a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py @@ -38,46 +38,62 @@ def extract_points(self) -> T.Sequence[geo.Point]: if not gpx_points: return gpx_points - offset = self._synx_gpx_by_first_timestamp(gpx_points) + offset = self._synx_gpx_by_first_gps_timestamp(gpx_points) self._rebase_times(gpx_points, offset=offset) return gpx_points - def _synx_gpx_by_first_timestamp(self, gpx_points: T.Sequence[geo.Point]) -> float: + def _synx_gpx_by_first_gps_timestamp( + self, gpx_points: T.Sequence[geo.Point] + ) -> float: + offset: float = 0.0 + + if not gpx_points: + return offset + first_gpx_dt = datetime.datetime.fromtimestamp( gpx_points[0].time, tz=datetime.timezone.utc ) LOG.info("First GPX timestamp: %s", first_gpx_dt) # Extract first GPS timestamp (if found) for synchronization - offset: float = 0.0 - # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself parser = GenericVideoParser(self.videoPath, self.options, {}) gps_points = parser.extract_points() - if gps_points: - first_gps_point = gps_points[0] - if isinstance(first_gps_point, telemetry.GPSPoint): - if first_gps_point.epoch_time is not None: - first_gps_dt = datetime.datetime.fromtimestamp( - first_gps_point.epoch_time, tz=datetime.timezone.utc - ) - LOG.info("First GPS timestamp: %s", first_gps_dt) - offset = gpx_points[0].time - first_gps_point.epoch_time - if offset: - LOG.warning( - "Found offset between GPX %s and video GPS timestamps %s: %s seconds", - first_gpx_dt, - first_gps_dt, - offset, - ) - else: + if not gps_points: LOG.warning( - "Skip GPX sync because no GPS found in video %s", + "Skip GPX synchronization because no GPS found in video %s", self.videoPath, ) + return offset + + first_gps_point = gps_points[0] + if isinstance(first_gps_point, telemetry.GPSPoint): + if first_gps_point.epoch_time is not None: + first_gps_dt = datetime.datetime.fromtimestamp( + first_gps_point.epoch_time, tz=datetime.timezone.utc + ) + LOG.info("First GPS timestamp: %s", first_gps_dt) + offset = gpx_points[0].time - first_gps_point.epoch_time + if offset: + LOG.warning( + "Found offset between GPX %s and video GPS timestamps %s: %s seconds", + first_gpx_dt, + first_gps_dt, + offset, + ) + else: + LOG.info( + "GPX and GPS are perfectly synchronized (all starts from %s)", + first_gpx_dt, + ) + else: + LOG.warning( + "Skip GPX synchronization because no GPS epoch time found in video %s", + self.videoPath, + ) return offset