From 50d10bd260fe4323c7bf3f6011e4efadbc218b69 Mon Sep 17 00:00:00 2001 From: Jon Betts Date: Wed, 19 Jul 2023 17:18:14 +0100 Subject: [PATCH] Add a client for getting data from YouTube --- .../via/services/youtube_api/client_test.py | 77 ++++++++++++++++++ via/services/youtube_api/__init__.py | 1 + via/services/youtube_api/client.py | 79 +++++++++++++++++++ 3 files changed, 157 insertions(+) create mode 100644 tests/unit/via/services/youtube_api/client_test.py create mode 100644 via/services/youtube_api/client.py diff --git a/tests/unit/via/services/youtube_api/client_test.py b/tests/unit/via/services/youtube_api/client_test.py new file mode 100644 index 00000000..9783240b --- /dev/null +++ b/tests/unit/via/services/youtube_api/client_test.py @@ -0,0 +1,77 @@ +from unittest.mock import sentinel + +import pytest + +from via.services.youtube_api import ( + CaptionTrack, + Transcript, + TranscriptText, + YouTubeAPIClient, +) + + +class TestYouTubeAPIClient: + def test_get_video_info(self, client, http_session, Video): + video = client.get_video_info("VIDEO_ID") + + http_session.post.assert_called_once_with( + "https://youtubei.googleapis.com/youtubei/v1/player", + json={ + "context": { + "client": { + "hl": "en", + "clientName": "WEB", + # Suspicious value right here... + "clientVersion": "2.20210721.00.00", + } + }, + "videoId": "VIDEO_ID", + }, + ) + response = http_session.post.return_value + response.json.assert_called_once_with() + + Video.from_v1_json.assert_called_once_with(data=response.json.return_value) + assert video == Video.from_v1_json.return_value + + def test_get_transcript(self, client, http_session): + caption_track = CaptionTrack("en", base_url=sentinel.url) + response = http_session.get.return_value + response.text = """ + + Hey there guys, + + Lichen' subscribe + + <font color="#A0AAB4">Buy my merch!</font> + + + """ + + transcript = client.get_transcript(caption_track) + + http_session.get.assert_called_once_with(url=caption_track.url) + assert transcript == Transcript( + track=caption_track, + text=[ + TranscriptText(text="Hey there guys,", start=0.21, duration=1.387), + TranscriptText(text="Lichen' subscribe", start=1.597, duration=0.0), + TranscriptText(text="Buy my merch!", start=4.327, duration=2.063), + ], + ) + + def test_get_transcript_with_no_url(self, client): + with pytest.raises(ValueError): + client.get_transcript(CaptionTrack("en", base_url=None)) + + @pytest.fixture + def client(self): + return YouTubeAPIClient() + + @pytest.fixture + def Video(self, patch): + return patch("via.services.youtube_api.client.Video") + + @pytest.fixture(autouse=True) + def http_session(self, patch): + return patch("via.services.youtube_api.client.HTTPService").return_value diff --git a/via/services/youtube_api/__init__.py b/via/services/youtube_api/__init__.py index a34d393e..d495e7e6 100644 --- a/via/services/youtube_api/__init__.py +++ b/via/services/youtube_api/__init__.py @@ -1 +1,2 @@ +from via.services.youtube_api.client import YouTubeAPIClient from via.services.youtube_api.models import * diff --git a/via/services/youtube_api/client.py b/via/services/youtube_api/client.py new file mode 100644 index 00000000..84268b0f --- /dev/null +++ b/via/services/youtube_api/client.py @@ -0,0 +1,79 @@ +from xml.etree import ElementTree + +import requests + +from via.services import HTTPService +from via.services.youtube_api.models import ( + CaptionTrack, + Transcript, + TranscriptText, + Video, +) + + +class YouTubeAPIError(Exception): + """Something has gone wrong interacting with YouTube.""" + + +class YouTubeAPIClient: + """A client for interacting with YouTube and manipulating related URLs.""" + + def __init__(self): + session = requests.Session() + # Ensure any translations that Google provides are in English + session.headers["Accept-Language"] = "en-US" + self._http = HTTPService(session=session) + + def get_video_info(self, video_id: str) -> Video: + """Get information for a given YouTube video.""" + + response = self._http.post( + "https://youtubei.googleapis.com/youtubei/v1/player", + json={ + "context": { + "client": { + "hl": "en", + "clientName": "WEB", + # Suspicious value right here... + "clientVersion": "2.20210721.00.00", + } + }, + "videoId": video_id, + }, + ) + + return Video.from_v1_json(data=response.json()) + + def get_transcript(self, caption_track: CaptionTrack) -> Transcript: + """Get the transcript associated with a caption track. + + You can set the track `translated_language_code` to ensure we translate + the value before returning it. + """ + + if not caption_track.url: + raise ValueError("Cannot get a transcript without a URL") + + response = self._http.get(url=caption_track.url) + xml_elements = ElementTree.fromstring(response.text) + + return Transcript( + track=caption_track, + text=[ + TranscriptText( + text=self._strip_html(xml_element.text), + start=float(xml_element.attrib["start"]), + duration=float(xml_element.attrib.get("dur", "0.0")), + ) + for xml_element in xml_elements + if xml_element.text is not None + ], + ) + + @staticmethod + def _strip_html(xml_string): + """Remove all non-text content from an XML fragment or string.""" + + return "".join( + ElementTree.fromstring(f"{xml_string}").itertext() + ).strip()