diff --git a/mapillary_tools/video_data_extraction/extractors/camm_parser.py b/mapillary_tools/video_data_extraction/extractors/camm_parser.py index 79cff2bf..4faa2c26 100644 --- a/mapillary_tools/video_data_extraction/extractors/camm_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/camm_parser.py @@ -13,8 +13,12 @@ class CammParser(BaseParser): parser_label = "camm" @functools.cached_property - def __camera_info(self) -> T.Tuple[str, str]: - with self.videoPath.open("rb") as fp: + def _camera_info(self) -> T.Tuple[str, str]: + source_path = self.geotag_source_path + if not source_path: + return "", "" + + with source_path.open("rb") as fp: return camm_parser.extract_camera_make_and_model(fp) def extract_points(self) -> T.Sequence[geo.Point]: @@ -28,15 +32,7 @@ def extract_points(self) -> T.Sequence[geo.Point]: return [] def extract_make(self) -> T.Optional[str]: - source_path = self.geotag_source_path - if not source_path: - return None - with source_path.open("rb") as _fp: - return self.__camera_info[0] or None + return self._camera_info[0] or None def extract_model(self) -> T.Optional[str]: - source_path = self.geotag_source_path - if not source_path: - return None - with source_path.open("rb") as _fp: - return self.__camera_info[1] or None + return self._camera_info[1] or None diff --git a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py index ce3287cd..60e23c1b 100644 --- a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py @@ -38,40 +38,71 @@ def extract_points(self) -> T.Sequence[geo.Point]: if not gpx_points: return gpx_points + offset = self._synx_gpx_by_first_gps_timestamp(gpx_points) + + self._rebase_times(gpx_points, offset=offset) + + return gpx_points + + def _synx_gpx_by_first_gps_timestamp( + self, gpx_points: T.Sequence[geo.Point] + ) -> float: + offset: float = 0.0 + + if not gpx_points: + return offset + first_gpx_dt = datetime.datetime.fromtimestamp( gpx_points[0].time, tz=datetime.timezone.utc ) LOG.info("First GPX timestamp: %s", first_gpx_dt) # Extract first GPS timestamp (if found) for synchronization - offset: float = 0.0 - parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions) + # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself + parser = GenericVideoParser(self.videoPath, self.options, {}) gps_points = parser.extract_points() - if gps_points: - first_gps_point = gps_points[0] - if isinstance(first_gps_point, telemetry.GPSPoint): - if first_gps_point.epoch_time is not None: - first_gps_dt = datetime.datetime.fromtimestamp( - first_gps_point.epoch_time, tz=datetime.timezone.utc - ) - LOG.info("First GPS timestamp: %s", first_gps_dt) - offset = gpx_points[0].time - first_gps_point.epoch_time - if offset: - LOG.warning( - "Found offset between GPX %s and video GPS timestamps %s: %s seconds", - first_gpx_dt, - first_gps_dt, - offset, - ) - self._rebase_times(gpx_points, offset=offset) + if not gps_points: + LOG.warning( + "Skip GPX synchronization because no GPS found in video %s", + self.videoPath, + ) + return offset - return gpx_points + first_gps_point = gps_points[0] + if isinstance(first_gps_point, telemetry.GPSPoint): + if first_gps_point.epoch_time is not None: + first_gps_dt = datetime.datetime.fromtimestamp( + first_gps_point.epoch_time, tz=datetime.timezone.utc + ) + LOG.info("First GPS timestamp: %s", first_gps_dt) + offset = gpx_points[0].time - first_gps_point.epoch_time + if offset: + LOG.warning( + "Found offset between GPX %s and video GPS timestamps %s: %s seconds", + first_gpx_dt, + first_gps_dt, + offset, + ) + else: + LOG.info( + "GPX and GPS are perfectly synchronized (all starts from %s)", + first_gpx_dt, + ) + else: + LOG.warning( + "Skip GPX synchronization because no GPS epoch time found in video %s", + self.videoPath, + ) + + return offset def extract_make(self) -> T.Optional[str]: - parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions) + # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself + parser = GenericVideoParser(self.videoPath, self.options, {}) return parser.extract_make() def extract_model(self) -> T.Optional[str]: - parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions) + # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself + parser = GenericVideoParser(self.videoPath, self.options, {}) return parser.extract_model()