From a2dacae9c8119ca2ee15430c4934af35d4d7d874 Mon Sep 17 00:00:00 2001 From: Jon Betts Date: Wed, 19 Jul 2023 16:47:49 +0100 Subject: [PATCH] Add a model for parsing data from YouTube V1 video info API --- .../unit/via/services/youtube_api/__init__.py | 0 .../services/youtube_api/test__nested_data.py | 20 + .../via/services/youtube_api/test_models.py | 341 ++++++++++++++++++ via/services/youtube_api/__init__.py | 1 + via/services/youtube_api/_nested_data.py | 13 + via/services/youtube_api/models.py | 315 ++++++++++++++++ 6 files changed, 690 insertions(+) create mode 100644 tests/unit/via/services/youtube_api/__init__.py create mode 100644 tests/unit/via/services/youtube_api/test__nested_data.py create mode 100644 tests/unit/via/services/youtube_api/test_models.py create mode 100644 via/services/youtube_api/__init__.py create mode 100644 via/services/youtube_api/_nested_data.py create mode 100644 via/services/youtube_api/models.py diff --git a/tests/unit/via/services/youtube_api/__init__.py b/tests/unit/via/services/youtube_api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/via/services/youtube_api/test__nested_data.py b/tests/unit/via/services/youtube_api/test__nested_data.py new file mode 100644 index 00000000..bd8c4a01 --- /dev/null +++ b/tests/unit/via/services/youtube_api/test__nested_data.py @@ -0,0 +1,20 @@ +import pytest +from pytest import param + +from via.services.youtube_api import safe_get + + +class TestSafeGet: + @pytest.mark.parametrize( + "data,path,expected", + ( + param({"a": {"b": 1}}, ["a", "b"], 1, id="nested_dict_key"), + param({"a": 1}, ["b"], ..., id="missing_dict_key"), + param({"a": None}, ["a"], None, id="null_not_default"), + param({"a": None}, ["a", "b"], ..., id="dict_key_into_none"), + param({"a": [{"b": 1}]}, ["a", 0, "b"], 1, id="array_key"), + param({"a": [{"b": 1}]}, ["a", 1, "b"], ..., id="missing_array_key"), + ), + ) + def test_it(self, data, path, expected): + assert safe_get(data, path, default=...) == expected diff --git a/tests/unit/via/services/youtube_api/test_models.py b/tests/unit/via/services/youtube_api/test_models.py new file mode 100644 index 00000000..c0045bc0 --- /dev/null +++ b/tests/unit/via/services/youtube_api/test_models.py @@ -0,0 +1,341 @@ +from unittest.mock import sentinel + +import pytest +from _pytest.mark import param +from h_matchers import Any + +from via.services.youtube_api import Captions, CaptionTrack, Video, VideoDetails + + +class TestVideoDetails: + def test_from_json(self): + video_details = VideoDetails.from_json( + { + "videoId": sentinel.id, + "title": sentinel.title, + "shortDescription": sentinel.short_description, + "author": sentinel.author, + # All of this is quite speculative at the moment in terms of whether + # it's useful, so we aren't carefully controlling the sub-items + "thumbnail": {"thumbnails": sentinel.thumbnails}, + } + ) + + assert video_details == Any.instance_of(VideoDetails).with_attrs( + { + "id": sentinel.id, + "title": sentinel.title, + "short_description": sentinel.short_description, + "author": sentinel.author, + "thumbnails": sentinel.thumbnails, + } + ) + + +class TestCaptionTrack: + @pytest.mark.parametrize("kind", (None, True)) + @pytest.mark.parametrize("is_translatable", (False, True)) + def test_from_json(self, kind, is_translatable): + data = { + "name": {"simpleText": "English (British) - Name"}, + "languageCode": "en-GB", + "isTranslatable": is_translatable, + "baseUrl": sentinel.url, + } + if kind: + data["kind"] = kind + if is_translatable: + data["isTranslatable"] = True + + caption_track = CaptionTrack.from_json(data) + + assert caption_track == Any.instance_of(CaptionTrack).with_attrs( + { + "name": "Name", + "language_code": "en-gb", + "label": "English (British) - Name", + "kind": kind, + "is_translatable": is_translatable, + "base_url": sentinel.url, + } + ) + + CAPTION_TRACK_IDS = ( + (CaptionTrack(language_code="en"), "en"), + (CaptionTrack(language_code="en", kind="asr"), "en.a"), + (CaptionTrack(language_code="en", name="Hello"), "en..SGVsbG8="), + (CaptionTrack(language_code="en", translated_language_code="fr"), "en...fr"), + # This combination isn't actually possible, but let's try everything at + # once + ( + CaptionTrack( + language_code="en-gb", + kind="asr", + name="Name", + translated_language_code="fr", + ), + "en-gb.a.TmFtZQ==.fr", + ), + ) + + @pytest.mark.parametrize("caption_track,id_string", CAPTION_TRACK_IDS) + def test_from_id(self, caption_track, id_string): + assert CaptionTrack.from_id(id_string) == caption_track + + @pytest.mark.parametrize("caption_track,id_string", CAPTION_TRACK_IDS) + def test_id(self, caption_track, id_string): + assert caption_track.id == id_string + + def test_is_auto_generated(self): + caption_track = CaptionTrack("en", kind="asr") + assert caption_track.is_auto_generated + + caption_track.kind = None + assert not caption_track.is_auto_generated + + caption_track.is_auto_generated = True + assert caption_track.is_auto_generated + assert caption_track.kind == "asr" + + caption_track.is_auto_generated = False + assert not caption_track.is_auto_generated + assert not caption_track.kind + + @pytest.mark.parametrize( + "caption_track,url", + ( + ( + CaptionTrack("en", base_url="http://example.com?a=1"), + "http://example.com?a=1", + ), + ( + CaptionTrack( + "en", + base_url="http://example.com?a=1", + translated_language_code="fr", + ), + "http://example.com?a=1&tlang=fr", + ), + (CaptionTrack("en", base_url=None), None), + (CaptionTrack("en", base_url=None, translated_language_code="fr"), None), + ), + ) + def test_url(self, caption_track, url): + assert caption_track.url == url + + +class TestCaptions: + def test_from_json(self, CaptionTrack): + captions = Captions.from_json( + { + "captionTracks": [{"track": "fake_dict"}], + "translationLanguages": [ + {"languageCode": "en", "languageName": "English"}, + {"languageCode": "en-GB", "languageName": "English (British)"}, + ], + } + ) + + CaptionTrack.from_json.assert_called_once_with({"track": "fake_dict"}) + assert captions == Any.instance_of(Captions).with_attrs( + { + "tracks": [CaptionTrack.from_json.return_value], + "translation_languages": [ + {"code": "en", "name": "English"}, + {"code": "en-gb", "name": "English (British)"}, + ], + } + ) + + def test_from_json_minimal(self, CaptionTrack): + captions = Captions.from_json({}) + + assert not captions.tracks + assert not captions.translation_languages + CaptionTrack.assert_not_called() + + def test_is_translation_supported(self): + captions = Captions( + translation_languages=[{"code": "en-gb", "name": "English (British)"}] + ) + + assert captions.is_translation_supported("en-GB") + assert captions.is_translation_supported("en-gb") + assert not captions.is_translation_supported("en") + + def test_is_translation_supported_with_no_languages(self): + assert not Captions().is_translation_supported("any") + + @pytest.mark.parametrize( + "preferences,expected_label", + ( + param( + [CaptionTrack("en")], + "plain_en", + id="direct_match", + ), + param( + [CaptionTrack("de"), CaptionTrack("en-gb")], + "plain_en_gb", + id="miss_then_hit", + ), + param( + [ + CaptionTrack("de"), + CaptionTrack(Any.string.matching("^en-.*"), name="Name"), + ], + "named_en_gb", + id="wild_cards", + ), + param( + [CaptionTrack("fr", kind=None), CaptionTrack("en", kind="asr")], + "en_auto", + id="fallback_to_auto", + ), + param( + [CaptionTrack(Any(), name="Name")], + "named_en_gb", + id="same_level_sorting", + ), + param([CaptionTrack("fr")], None, id="miss"), + param( + [CaptionTrack("en", translated_language_code="fr")], + None, + id="translation_without_languages", + ), + ), + ) + def test_find_matching_track(self, preferences, expected_label): + captions = Captions( + tracks=[ + CaptionTrack("en", label="plain_en"), + CaptionTrack("en-gb", label="plain_en_gb"), + CaptionTrack("en-us", name="Name", label="named_en_us"), + CaptionTrack("en-gb", name="Name", label="named_en_gb"), + CaptionTrack("en", kind="asr", label="en_auto"), + ] + ) + + caption_track = captions.find_matching_track(preferences) + + assert ( + caption_track.label == expected_label + if expected_label + else not caption_track + ) + + @pytest.mark.parametrize( + "desired_language_code,expected_label", + ( + param("en", "plain_de", id="translatable"), + param("en-gb", None, id="untranslatable"), + ), + ) + def test_find_matching_track_with_translation( + self, desired_language_code, expected_label + ): + captions = Captions( + tracks=[ + CaptionTrack("fr", label="plain_fr", is_translatable=False), + CaptionTrack("de", label="plain_de", is_translatable=True), + ], + translation_languages=[{"code": "en", "name": "English"}], + ) + + caption_track = captions.find_matching_track( + [ + CaptionTrack( + language_code=Any(), + name=Any(), + kind=Any(), + translated_language_code=desired_language_code, + ) + ] + ) + + if expected_label: + assert caption_track.label == expected_label + assert caption_track.translated_language_code == desired_language_code + else: + assert not caption_track + + @pytest.fixture + def CaptionTrack(self, patch): + return patch("via.services.youtube_api.models.CaptionTrack") + + +class TestVideo: + def test_from_json(self, Captions, VideoDetails): + video = Video.from_json( + url=sentinel.url, + data={ + "videoDetails": sentinel.video_details, + "captions": {"playerCaptionsTracklistRenderer": sentinel.captions}, + "playabilityStatus": {"status": "OK"}, + }, + ) + + Captions.from_json.assert_called_once_with(sentinel.captions) + VideoDetails.from_json.assert_called_once_with(sentinel.video_details) + + assert video == Any.instance_of(Video).with_attrs( + { + "caption": Captions.from_json.return_value, + "details": VideoDetails.from_json.return_value, + "playability_status": "OK", + "url": sentinel.url, + } + ) + + def test_from_json_minimal(self): + video = Video.from_json(sentinel.url, {}) + + assert not video.caption + assert not video.details + assert not video.playability_status + + @pytest.mark.parametrize( + "data,is_playable", + ( + ({"playabilityStatus": {"status": "OK"}}, True), + ({"playabilityStatus": {"status": "Other"}}, False), + ({"playabilityStatus": None}, False), + ({}, False), + ), + ) + def test_is_playable(self, data, is_playable): + assert Video.from_json(sentinel.url, data).is_playable == is_playable + + @pytest.mark.usefixtures("Captions") + @pytest.mark.parametrize( + "data,has_captions", + ( + ( + {"captions": {"playerCaptionsTracklistRenderer": sentinel.captions}}, + True, + ), + ({"captions": {"playerCaptionsTracklistRenderer": None}}, False), + ({"captions": None}, False), + ({}, False), + ), + ) + def test_has_captions(self, data, has_captions): + assert Video.from_json(sentinel.url, data).has_captions == has_captions + + @pytest.mark.parametrize( + "video,expected_id", + ( + (Video(details=VideoDetails(id="1234")), "1234"), + (Video(details=None), None), + ), + ) + def test_id(self, video, expected_id): + assert video.id == expected_id + + @pytest.fixture + def Captions(self, patch): + return patch("via.services.youtube_api.models.Captions") + + @pytest.fixture + def VideoDetails(self, patch): + return patch("via.services.youtube_api.models.VideoDetails") diff --git a/via/services/youtube_api/__init__.py b/via/services/youtube_api/__init__.py new file mode 100644 index 00000000..a34d393e --- /dev/null +++ b/via/services/youtube_api/__init__.py @@ -0,0 +1 @@ +from via.services.youtube_api.models import * diff --git a/via/services/youtube_api/_nested_data.py b/via/services/youtube_api/_nested_data.py new file mode 100644 index 00000000..85319b07 --- /dev/null +++ b/via/services/youtube_api/_nested_data.py @@ -0,0 +1,13 @@ +from typing import Iterable + + +def safe_get(data, path: Iterable, default=None): + """Get deeply nested items without exploding.""" + + for key in path: + try: + data = data[key] + except (KeyError, IndexError, TypeError): + return default + + return data diff --git a/via/services/youtube_api/models.py b/via/services/youtube_api/models.py new file mode 100644 index 00000000..95d5b8d0 --- /dev/null +++ b/via/services/youtube_api/models.py @@ -0,0 +1,315 @@ +import base64 +from copy import deepcopy +from dataclasses import dataclass, field +from itertools import zip_longest +from operator import attrgetter +from typing import List, Optional + +from via.services.youtube_api._nested_data import safe_get + + +@dataclass +class VideoDetails: + """Metadata for the video.""" + + id: str = None + title: str = None + short_description: str = None + author: str = None + thumbnails: List[dict] = None + + @classmethod + def from_json(cls, data): + """Create an instance from the `videoDetails` section of JSON.""" + + return VideoDetails( + id=data["videoId"], + title=data["title"], + short_description=data["shortDescription"], + author=data["author"], + thumbnails=data["thumbnail"]["thumbnails"], + ) + + +@dataclass +class CaptionTrack: + """A source of transcription data, in a particular language.""" + + # Items which form the unique part + language_code: str + """Original language of the track.""" + + name: Optional[str] = None + """Human set name for the track.""" + + kind: str = None + """Is this track automatically generated by audio to text AI?""" + + translated_language_code: Optional[str] = None + """Language to machine translate this into. Set this manually.""" + + # Other items which we cannot determine from the id + label: Optional[str] = None + """Human readable name (determined by language + name).""" + + is_translatable: Optional[bool] = None + """Can this be translated into other languages?""" + + base_url: Optional[str] = None + """URL to download the original language text (as XML).""" + + @classmethod + def from_json(cls, data: dict): + """Create an instance from a `captionTrack` section of JSON.""" + + label = data["name"]["simpleText"] + + return CaptionTrack( + name=label.split(" - ", 1)[-1] if " - " in label else None, + language_code=data["languageCode"].lower(), + label=label, + kind=data.get("kind", None), + is_translatable=data.get("isTranslatable", False), + base_url=data["baseUrl"], + ) + + @classmethod + def from_id(cls, id_string: str): + """Create a partially filled out track from and id string.""" + + data = dict( + zip_longest( + [ + "language_code", + "auto_generated", + "name", + "translated_language_code", + ], + [part or None for part in id_string.split(".")], + ) + ) + + if name := data.get("name"): + data["name"] = base64.b64decode(name.encode("utf-8")).decode("utf-8") + + if data.pop("auto_generated", None): + data["kind"] = "asr" + + return cls(**data) + + @property + def id(self) -> str: + if self.name: + # Ensure our ids don't contain wild characters + name = base64.b64encode(self.name.encode("utf-8")).decode("utf-8") + else: + name = None + + return ".".join( + part or "" + for part in [ + self.language_code, + "a" if self.is_auto_generated else None, + name, + self.translated_language_code, + ] + ).rstrip(".") + + @property + def is_auto_generated(self) -> bool: + """Is this caption track auto generated?""" + + return self.kind == "asr" + + @is_auto_generated.setter + def is_auto_generated(self, value: bool): + self.kind = "asr" if value else None + + @property + def url(self) -> Optional[str]: + """Get the URL to download a transcript of this caption track""" + if not self.base_url: + return None + + url = self.base_url + + if self.translated_language_code: + url += f"&tlang={self.translated_language_code}" + + return url + + +@dataclass +class Captions: + """All information about captions.""" + + tracks: List[CaptionTrack] = field(default_factory=list) + """Available tracks to pick from.""" + + translation_languages: List[dict] = field(default_factory=list) + """Languages the tracks marked as translatable can be translated to.""" + + @classmethod + def from_json(cls, data: dict): + """Create an instance from JSON. + + This is populated from the `captions.playerCaptionsTracklistRenderer` + section. + """ + + return Captions( + tracks=[ + CaptionTrack.from_json(track) for track in data.get("captionTracks", []) + ], + translation_languages=[ + { + "code": language["languageCode"].lower(), + "name": language["languageName"], + } + for language in data.get("translationLanguages", []) + ], + ) + + def is_translation_supported(self, language_code: str) -> bool: + """Can we translate caption tracks into this language?""" + + if not self.translation_languages: + return False + + language_code = language_code.lower() + return any( + language["code"] == language_code for language in self.translation_languages + ) + + def find_matching_track( + self, preferences: List[CaptionTrack] + ) -> Optional[CaptionTrack]: + """ + Get a caption track which matching the preferences in order. + + This method takes the provided list of caption track objects and + searches the available tracks for those with matching details: + + * language_code + * name + * is_auto_generated / kind + * translation_language_code + + For a match to happen, we must match the first three items, and be + translatable to the last if present. + + Earlier items are higher priority. + + :param preferences: List of partially filled out caption track objects + which represent the caption track we would like. + :return: + """ + + def get_key(track: CaptionTrack): + return track.language_code, track.kind, track.name + + search_keys = [get_key(preference) for preference in preferences] + best_index, best_preference, best_caption_track = None, None, None + + # Sort the tracks to keep the algorithm more stable! This only insulates + # us from sorting changes, not metadata changes. + for caption_track in sorted(self.tracks, key=attrgetter("id")): + try: + index = search_keys.index(get_key(caption_track)) + except ValueError: + continue + + preference = preferences[index] + + # If we match, but we want to be translated, check we can be + if preference.translated_language_code and ( + not caption_track.is_translatable + or not self.is_translation_supported( + preference.translated_language_code + ) + ): + continue + + # Items with lower indexes are first choices for the user + if best_index is None or best_index > index: + best_index = index + best_preference = preference + best_caption_track = deepcopy(caption_track) + + if best_index is None: + return None + + if best_preference.translated_language_code: + # Convert the track to a translated language if required, we've + # checked above this is ok. + best_caption_track.translated_language_code = ( + best_preference.translated_language_code + ) + + return best_caption_track + + +@dataclass +class Video: + """Data for a video in YouTube.""" + + caption: Optional[Captions] = None + """Caption related information (tracks and languages).""" + + details: Optional[VideoDetails] = None + """Metadata for the video.""" + + playability_status: str = None + """Indicator of whether the video can be played.""" + + url: str = None + """The URL for the video, added by us, rather than from the JSON.""" + + @classmethod + def from_json(cls, url, data): + captions = safe_get(data, ["captions", "playerCaptionsTracklistRenderer"]) + details = data.get("videoDetails") + + return Video( + caption=Captions.from_json(captions) if captions else None, + details=VideoDetails.from_json(details) if details else None, + playability_status=safe_get(data, ["playabilityStatus", "status"]), + url=url, + ) + + @property + def is_playable(self) -> bool: + """Can this video be played?""" + + return self.playability_status == "OK" + + @property + def has_captions(self) -> bool: + """Does this video have captions?""" + + return bool(self.caption and self.caption.tracks) + + @property + def id(self) -> Optional[str]: + """Get the ID of this video.""" + + # Just a convenience accessor, as having the id tucked away in the + # details object is weird. + return self.details.id if self.details else None + + +@dataclass +class TranscriptText: + """An individual row of transcript text.""" + + text: str + start: float + duration: float + + +@dataclass +class Transcript: + """A full transcript from a caption track.""" + + track: CaptionTrack + text: List[TranscriptText]