From d75533cb255d15aaabd79e77ff8fffa989a072e2 Mon Sep 17 00:00:00 2001 From: blackicedbear Date: Mon, 2 Mar 2026 01:02:55 +0100 Subject: [PATCH] refactor(RTLP): improve code style and fix logic bugs --- RTLP/__init__.py | 584 ++++++++++++++++++++++++----------------------- 1 file changed, 294 insertions(+), 290 deletions(-) diff --git a/RTLP/__init__.py b/RTLP/__init__.py index b87a08a..866f9e0 100644 --- a/RTLP/__init__.py +++ b/RTLP/__init__.py @@ -1,30 +1,32 @@ from __future__ import annotations +import base64 +import hashlib +import json +import re +import secrets from http.cookiejar import MozillaCookieJar from typing import Any, Optional, Union import click import requests -from devine.core.titles import Episode, Movie, Movies, Series -from devine.core.manifests import DASH from devine.core.constants import AnyTrack -from devine.core.service import Service from devine.core.credential import Credential -from devine.core.tracks import Chapters, Tracks, Track +from devine.core.manifests import DASH +from devine.core.service import Service +from devine.core.titles import Episode, Movie, Movies, Series +from devine.core.tracks import Chapters, Tracks -import re -import json -import base64 -import hashlib -import secrets class RTLP(Service): + """RTL+ (plus.rtl.de) streaming service.""" + # List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase. ALIASES = () # List of regions of which the service offers support for. - GEOFENCE = ("de", "at") + GEOFENCE = ("de", "at", "ch") TITLE_RE = r"^https?:\/\/plus\.rtl\.de\/video-tv\/(?Pshows|serien|filme)\/(?:[^\/]+-)?(?P\d+)(?:\/[^\/]+-)?(?P\d+)?(?:\/[^\/]+-)?(?P\d+)?$" AUTH_CODE_REGEX = r"code=([\w-]+\.[\w-]+\.[\w-]+)" @@ -38,17 +40,11 @@ class RTLP(Service): def __init__(self, ctx: click.Context, title: str): self.title = title - super().__init__(ctx) def get_session(self) -> requests.Session: - # modify the creation of the requests session (stored as self.session) - # make a super() call to take the original result and further modify it, - # or don't to make a completely fresh one if required. - session = super().get_session() - # Set default headers as specified session.headers.update({ 'Accept': '*/*', 'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', @@ -56,7 +52,7 @@ class RTLP(Service): 'Referer': 'https://plus.rtl.de/', 'Rtlplus-Client-Id': 'rci:rtlplus:web', 'Rtlplus-Client-Version': '2024.7.29.2', - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0' + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0', }) return session @@ -66,14 +62,21 @@ class RTLP(Service): if not cookies: self._authenticate_anonymous() - else: self._authenticate_with_cookies(cookies) - def generate_code_verifier(self): + # ------------------------------------------------------------------ + # Authentication helpers + # ------------------------------------------------------------------ + + @staticmethod + def _generate_code_verifier() -> str: + """Generate a PKCE code verifier.""" return secrets.token_urlsafe(64) - def generate_code_challenge(self, verifier): + @staticmethod + def _generate_code_challenge(verifier: str) -> str: + """Generate a PKCE code challenge from a verifier.""" sha256_hash = hashlib.sha256(verifier.encode()).digest() return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=") @@ -81,21 +84,18 @@ class RTLP(Service): auth_url = self.config["endpoints"]["auth_url"] token_url = self.config["endpoints"]["token_url"] - code_verifier = self.generate_code_verifier() - code_challenge = self.generate_code_challenge(code_verifier) + code_verifier = self._generate_code_verifier() + code_challenge = self._generate_code_challenge(code_verifier) redirect_url_request = self.session.get( auth_url, - headers={ - **self.session.headers, - }, params={ 'client_id': 'rtlplus-web', 'redirect_uri': 'https://plus.rtl.de/silent-check-sso.html', 'response_type': 'code', 'scope': 'openid', 'code_challenge_method': 'S256', - 'code_challenge': code_challenge + 'code_challenge': code_challenge, }, cookies=cookies, ) @@ -104,32 +104,33 @@ class RTLP(Service): redirect_url = redirect_url_request.url auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url) - if auth_code_match: - auth_code = auth_code_match.group(1) - self.log.debug(f"Auth Code: {auth_code}") - else: - self.log.error("Authorization code not found in redirect URL.") - raise EnvironmentError("Could not find authorization code.") + if not auth_code_match: + raise EnvironmentError("Authorization code not found in redirect URL.") + + auth_code = auth_code_match.group(1) + self.log.debug(f"Auth Code: {auth_code}") response = self.session.post( token_url, headers={ - **self.session.headers, - 'Content-Type': 'application/x-www-form-urlencoded' + 'Content-Type': 'application/x-www-form-urlencoded', }, cookies=cookies, - data=bytes(f'grant_type=authorization_code&client_id=rtlplus-web&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html&code={auth_code}&code_verifier={code_verifier}', 'utf-8'), + data=bytes( + f'grant_type=authorization_code&client_id=rtlplus-web' + f'&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html' + f'&code={auth_code}&code_verifier={code_verifier}', + 'utf-8', + ), ) response.raise_for_status() auth_response = response.json() - if 'access_token' in auth_response: - self._rtlp_auth_jwt = auth_response['access_token'] - self.log.info("Successfully authenticated with cookies.") - else: - self.log.error("No access_token found in response.") + if 'access_token' not in auth_response: raise EnvironmentError("Cookie authentication failed: no access token in response.") + self._rtlp_auth_jwt = auth_response['access_token'] + self.log.info("Successfully authenticated with cookies.") def _authenticate_anonymous(self) -> None: token_url = self.config["endpoints"]["token_url"] @@ -137,322 +138,325 @@ class RTLP(Service): response = self.session.post( token_url, headers={ - **self.session.headers, - 'Content-Type': 'application/x-www-form-urlencoded' + 'Content-Type': 'application/x-www-form-urlencoded', }, - data=bytes('grant_type=client_credentials&client_id=anonymous-user&client_secret=4bfeb73f-1c4a-4e9f-a7fa-96aa1ad3d94c', 'utf-8'), + data=bytes( + 'grant_type=client_credentials&client_id=anonymous-user' + '&client_secret=4bfeb73f-1c4a-4e9f-a7fa-96aa1ad3d94c', + 'utf-8', + ), ) response.raise_for_status() auth_response = response.json() - - if 'access_token' in auth_response: - self._rtlp_auth_jwt = auth_response['access_token'] - self.log.info("Authenticated anonymously with RTL+ service successfully.") - else: - self.log.error("No access_token found in response.") + if 'access_token' not in auth_response: raise EnvironmentError("Anonymous authentication failed: no access token in response.") - # Required methods: + self._rtlp_auth_jwt = auth_response['access_token'] + self.log.info("Authenticated anonymously with RTL+ service.") + + # ------------------------------------------------------------------ + # Validation helpers + # ------------------------------------------------------------------ + + @staticmethod + def _validate_required_fields(data: dict, fields: list[str], context: str) -> None: + """Raise ValueError if any required fields are missing from data.""" + missing = [f for f in fields if f not in data] + if missing: + raise ValueError(f"Missing required fields {missing} in {context}.") + + # ------------------------------------------------------------------ + # Required service methods + # ------------------------------------------------------------------ def get_titles(self) -> Union[Movies, Series]: - graphql_url = self.config["endpoints"]["graphql_url"] + match = re.match(self.TITLE_RE, self.title) + if not match: + raise ValueError(f"Could not parse ID from title — is the URL correct? ({self.title})") - try: - kind, show_id, season_id, episode_id = ( - re.match(self.TITLE_RE, self.title).group(i) for i in ("kind", "show_id", "season_id", "episode_id") - ) - except Exception: - raise ValueError("Could not parse ID from title - is the URL correct?") + kind = match.group("kind") + show_id = match.group("show_id") + season_id = match.group("season_id") + episode_id = match.group("episode_id") if not kind or not show_id: - self.log.error(f"Invalid title URL: {self.title}. 'kind' or 'show_id' is missing.") - raise ValueError("Invalid title URL: 'kind' or 'show_id' is missing.") + raise ValueError(f"Invalid title URL: 'kind' or 'show_id' is missing. ({self.title})") if kind == "filme": - content_id = f'rrn:watch:videohub:movie:{show_id}' + return self._get_movie_titles(show_id) + elif kind in ("shows", "serien"): + return self._get_series_titles(show_id, season_id, episode_id) - response_data = self._execute_graphql_query('MovieDetail', {'id': content_id}, 'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4') + raise ValueError(f"Unsupported content type: '{kind}'.") - if 'movie' in response_data: - movie_data = response_data['movie'] + def _get_movie_titles(self, show_id: str) -> Movies: + """Fetch and return movie title data.""" + content_id = f'rrn:watch:videohub:movie:{show_id}' - if 'id' not in movie_data or 'title' not in movie_data or 'productionYear' not in movie_data: - self.log.error("Invalid movie_data data received.") - raise ValueError("Invalid movie_data data received from RTL+ service.") + response_data = self._execute_graphql_query( + 'MovieDetail', {'id': content_id}, + 'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4', + ) - self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}") + movie_data = response_data.get('movie') + if not movie_data: + raise ValueError("Failed to fetch movie data from RTL+ service.") - return Movies( - [ - Movie( - id_=content_id, - service=self.__class__, - name=movie_data['title'], - data=movie_data, - year=movie_data['productionYear'], - ) - ] - ) + self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data') - self.log.error(f"Failed to fetch Movie data: {response.status_code} - {response.text}") - raise EnvironmentError("Failed to fetch Movie data from RTL+ service.") + self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}") - if kind == "shows" or kind == "serien": - if episode_id: - content_id = f'rrn:watch:videohub:episode:{episode_id}' + return Movies([ + Movie( + id_=content_id, + service=self.__class__, + name=movie_data['title'], + data=movie_data, + year=movie_data['productionYear'], + ) + ]) - response_data = self._execute_graphql_query('EpisodeDetail', {'episodeId': content_id}, '2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a') + def _get_series_titles(self, show_id: str, season_id: Optional[str], episode_id: Optional[str]) -> Series: + """Route to the appropriate series fetch method based on URL specificity.""" + if episode_id: + return self._get_single_episode(episode_id) + elif season_id: + return self._get_single_season(season_id) + elif show_id: + return self._get_full_show(show_id) - if 'episode' in response_data: - episode_data = response_data['episode'] + raise ValueError("No valid identifier found in series URL.") - if 'id' not in episode_data or 'title' not in episode_data or 'number' not in episode_data or 'episodeSeason' not in episode_data: - self.log.error("Invalid episode data received.") - raise ValueError("Invalid episode data received from RTL+ service.") + def _get_single_episode(self, episode_id: str) -> Series: + """Fetch a single episode by its ID.""" + content_id = f'rrn:watch:videohub:episode:{episode_id}' - if 'format' not in episode_data and 'title' not in episode_data['format']: - self.log.error("Invalid episode format received.") - raise ValueError("Invalid episode format received from RTL+ service.") + response_data = self._execute_graphql_query( + 'EpisodeDetail', {'episodeId': content_id}, + '2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a', + ) - return Series( - [ - Episode( - id_=content_id, - service=self.__class__, - title=episode_data['format']['title'], - season=self.get_episode_session(episode_data), - number=episode_data['number'], - name=episode_data['title'], - data=episode_data, - ) - ] - ) - - elif season_id: - content_id = f'rrn:watch:videohub:season:{season_id}' + episode_data = response_data.get('episode') + if not episode_data: + raise ValueError("Failed to fetch episode data from RTL+ service.") - response_data = self._execute_graphql_query('SeasonWithFormatAndEpisodes', {'seasonId': content_id}, 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584') + self._validate_required_fields(episode_data, ['id', 'title', 'number', 'episodeSeason'], 'episode_data') - if 'season' in response_data: - season_data = response_data['season'] + if 'format' not in episode_data or 'title' not in episode_data['format']: + raise ValueError("Missing 'format.title' in episode_data.") - if 'format' not in season_data or 'title' not in season_data['format']: - self.log.error("Invalid season format received.") - raise ValueError("Invalid season format received from RTL+ service.") + return Series([ + Episode( + id_=content_id, + service=self.__class__, + title=episode_data['format']['title'], + season=self._get_episode_season(episode_data), + number=episode_data['number'], + name=episode_data['title'], + data=episode_data, + ) + ]) - if not 'episodes' in season_data or not isinstance(season_data['episodes'], list): - self.log.error("Invalid season data received.") - raise ValueError("Invalid season data received from RTL+ service.") + def _get_single_season(self, season_id: str) -> Series: + """Fetch all episodes from a single season.""" + content_id = f'rrn:watch:videohub:season:{season_id}' - episodes = [] - for episode in season_data['episodes']: - if 'id' not in episode or 'title' not in episode or 'number' not in episode or 'episodeSeason' not in episode: - self.log.error("Invalid episode data received.") - raise ValueError("Invalid episode data received from RTL+ service.") + response_data = self._execute_graphql_query( + 'SeasonWithFormatAndEpisodes', {'seasonId': content_id}, + 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584', + ) - episodes.append( - Episode( - id_=episode['id'], - service=self.__class__, - title=season_data['format']['title'], - season=self.get_episode_session(episode), - number=episode['number'], - name=episode['title'], - data=episode, - ) - ) + season_data = response_data.get('season') + if not season_data: + raise ValueError("Failed to fetch season data from RTL+ service.") - return Series( - episodes - ) + if 'format' not in season_data or 'title' not in season_data['format']: + raise ValueError("Missing 'format.title' in season_data.") - elif show_id: - content_id = f'rrn:watch:videohub:format:{show_id}' + if 'episodes' not in season_data or not isinstance(season_data['episodes'], list): + raise ValueError("Missing or invalid 'episodes' in season_data.") - response_data = self._execute_graphql_query('Format', {'id': content_id}, 'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa') + episodes = [] + for episode in season_data['episodes']: + self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode') - if 'format' in response_data: - format_data = response_data['format'] + episodes.append(Episode( + id_=episode['id'], + service=self.__class__, + title=season_data['format']['title'], + season=self._get_episode_season(episode), + number=episode['number'], + name=episode['title'], + data=episode, + )) - if 'title' not in format_data or 'id' not in format_data: - self.log.error("Invalid format data received.") - raise ValueError("Invalid format data received from RTL+ service.") + return Series(episodes) - if 'seasons' not in format_data or not isinstance(format_data['seasons'], list): - self.log.error("Invalid format seasons data received.") - raise ValueError("Invalid format seasons data received from RTL+ service.") + def _get_full_show(self, show_id: str) -> Series: + """Fetch all episodes across all seasons of a show/format.""" + content_id = f'rrn:watch:videohub:format:{show_id}' - episodes = [] - for season in format_data['seasons']: - if not 'id' in season or not 'seasonType' in season: - self.log.error("Invalid season data received.") - raise ValueError("Invalid season data received from RTL+ service.") + response_data = self._execute_graphql_query( + 'Format', {'id': content_id}, + 'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa', + ) - season_id = season['id'] + format_data = response_data.get('format') + if not format_data: + raise ValueError("Failed to fetch format data from RTL+ service.") - response_data = self._execute_graphql_query('SeasonWithFormatAndEpisodes', {'seasonId': season_id}, 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584') + self._validate_required_fields(format_data, ['id', 'title', 'seasons'], 'format_data') - if 'season' in response_data: - season_data = response_data['season'] + if not isinstance(format_data['seasons'], list): + raise ValueError("Invalid 'seasons' in format_data (expected list).") - if 'format' not in season_data or 'title' not in season_data['format']: - self.log.error("Invalid season format received.") - raise ValueError("Invalid season format received from RTL+ service.") + episodes = [] + for season in format_data['seasons']: + self._validate_required_fields(season, ['id', 'seasonType'], 'season') - if not 'episodes' in season_data or not isinstance(season_data['episodes'], list): - self.log.error("Invalid season data received.") - raise ValueError("Invalid season data received from RTL+ service.") + season_response = self._execute_graphql_query( + 'SeasonWithFormatAndEpisodes', {'seasonId': season['id']}, + 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584', + ) - for episode in season_data['episodes']: - if 'id' not in episode or 'title' not in episode or 'number' not in episode or 'episodeSeason' not in episode: - self.log.error("Invalid episode data received.") - raise ValueError("Invalid episode data received from RTL+ service.") + season_data = season_response.get('season') + if not season_data: + continue - episodes.append( - Episode( - id_=episode['id'], - service=self.__class__, - title=season_data['format']['title'], - season=self.get_episode_session(episode), - number=episode['number'], - name=episode['title'], - data=episode, - ) - ) + if 'format' not in season_data or 'title' not in season_data['format']: + raise ValueError("Missing 'format.title' in season_data.") - return Series( - episodes - ) + if 'episodes' not in season_data or not isinstance(season_data['episodes'], list): + raise ValueError("Missing or invalid 'episodes' in season_data.") - self.log.error(f"Failed to fetch series data: {response.status_code} - {response.text}") - raise EnvironmentError("Failed to fetch series data from RTL+ service.") + for episode in season_data['episodes']: + self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode') + episodes.append(Episode( + id_=episode['id'], + service=self.__class__, + title=season_data['format']['title'], + season=self._get_episode_season(episode), + number=episode['number'], + name=episode['title'], + data=episode, + )) + + return Series(episodes) def get_tracks(self, title: Union[Episode, Movie]) -> Tracks: - playout_url = self.config["endpoints"]["playout"] + if not isinstance(title, (Episode, Movie)): + raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.") - if isinstance(title, Episode) or isinstance(title, Movie): - playout_url = playout_url.format(id=title.data['id']) - else: - self.log.error(f"Unsupported title type: {type(title)}. Expected Series or Movies.") - raise ValueError(f"Unsupported title type: {type(title)}. Expected Series or Movies.") + playout_url = self.config["endpoints"]["playout"].format(id=title.data['id']) response = self.session.get( playout_url, headers={ - **self.session.headers, - 'x-auth-token': self._rtlp_auth_jwt - } - ) - - if response and response.status_code == 200: - response_data = response.json() - - all_parsed_tracks = [] # Use a list to collect all individual track objects - - for variant in response_data: - if 'name' not in variant: - self.log.error("Invalid playout variant data received.") - raise ValueError("Invalid playout variant data received from RTL+ service.") - - # Assuming 'dashsd' and 'dashhd' variants contain the MPD URLs - if variant['name'] == 'dashhd': - if 'sources' not in variant and len(variant['sources']) == 0: - self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.") - continue - - source_entry = variant['sources'][0] - - # Assuming the 'url' key in each source_entry is the DASH manifest URL - if not 'url' in source_entry: - self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.") - continue - - manifest_url = source_entry['url'] - - try: - all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de") # Use title's language for filtering/tagging - except Exception as e: - self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}") - # Decide if you want to raise or just log and continue for other manifests - continue - - # Return a new Tracks object containing all collected tracks - return Tracks(all_parsed_tracks) - - else: - self.log.error(f"Failed to fetch tracks data: {response.status_code} - {response.text}") - raise EnvironmentError("Failed to fetch tracks data from RTL+ service.") - - def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]: - # technically optional, but you must define and at least `return []`. - return Chapters() - - def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]: - return None - - def get_widevine_license(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Optional[Union[bytes, str]]: - license_url = self.config["endpoints"]["license"] - - response = self.session.post( - license_url, - headers={ - **self.session.headers, - 'Content-Type': 'application/octet-stream', - 'x-auth-token': self._rtlp_auth_jwt + 'x-auth-token': self._rtlp_auth_jwt, }, - data=challenge - ) - - if response.status_code != 200: - self.log.error(f"Failed to fetch license: {response.status_code} - {response.text}") - raise ConnectionError(response.text) - - self.log.info("Successfully fetched Widevine license from RTL+ service.") - return response.content - - def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict: - response = self.session.get( - self.config["endpoints"]["graphql_url"], - headers={ - **self.session.headers, - 'Authorization': f'Bearer {self._rtlp_auth_jwt}' - }, - params={ - 'operationName': operation_name, - 'variables': json.dumps(variables).encode(), - 'extensions': json.dumps({ - 'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash} - }).encode() - } ) response.raise_for_status() response_data = response.json() + all_parsed_tracks = [] + for variant in response_data: + if 'name' not in variant: + raise ValueError("Invalid playout variant data: missing 'name'.") + + if variant['name'] != 'dashhd': + continue + + if 'sources' not in variant or len(variant['sources']) == 0: + self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.") + continue + + source_entry = variant['sources'][0] + + if 'url' not in source_entry: + self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.") + continue + + manifest_url = source_entry['url'] + + try: + all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de") + except Exception as e: + self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}") + continue + + return Tracks(all_parsed_tracks) + + def get_chapters(self, title: Union[Movies, Series]) -> list: + return Chapters() + + def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]: + return None + + def get_widevine_license( + self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack, + ) -> Optional[Union[bytes, str]]: + """Obtain a Widevine license for the given track.""" + license_url = self.config["endpoints"]["license"] + + response = self.session.post( + license_url, + headers={ + 'Content-Type': 'application/octet-stream', + 'x-auth-token': self._rtlp_auth_jwt, + }, + data=challenge, + ) + response.raise_for_status() + + self.log.info("Successfully fetched Widevine license.") + return response.content + + # ------------------------------------------------------------------ + # GraphQL & utility helpers + # ------------------------------------------------------------------ + + def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict: + """Execute a persisted GraphQL query and return the 'data' payload.""" + response = self.session.get( + self.config["endpoints"]["graphql_url"], + headers={ + 'Authorization': f'Bearer {self._rtlp_auth_jwt}', + }, + params={ + 'operationName': operation_name, + 'variables': json.dumps(variables).encode(), + 'extensions': json.dumps({ + 'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}, + }).encode(), + }, + ) + response.raise_for_status() + + response_data = response.json() if 'data' not in response_data: - self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.") - raise ValueError(f"Invalid GraphQL response for '{operation_name}'.") + raise ValueError(f"Invalid GraphQL response for '{operation_name}': missing 'data' field.") return response_data['data'] - def get_episode_session(self, episode) -> int: - season_value = None - if 'seasonType' not in episode['episodeSeason']: - self.log.error("Invalid episode season received.") - raise ValueError("Invalid episode season received from RTL+ service.") + @staticmethod + def _get_episode_season(episode: dict) -> Optional[int]: + """Extract the season number (ordinal or year) from episode data.""" + episode_season = episode.get('episodeSeason', {}) + season_type = episode_season.get('seasonType') - seasonType = episode['episodeSeason']['seasonType'] + if not season_type: + raise ValueError("Missing 'seasonType' in episode season data.") - if seasonType == 'ANNUAL': - if 'season' in episode['episodeSeason'] and 'year' in episode['episodeSeason']['season']: - season_value = int(episode['episodeSeason']['season']['year']) - elif seasonType == 'ORDINAL': - if 'season' in episode['episodeSeason'] and 'ordinal' in episode['episodeSeason']['season']: - season_value = int(episode['episodeSeason']['season']['ordinal']) - else: - self.log.error(f"Unknown season type '{seasonType}' received.") - raise ValueError(f"Unknown season type '{seasonType}' received from RTL+ service.") + season_info = episode_season.get('season', {}) - return season_value \ No newline at end of file + if season_type == 'ANNUAL': + year = season_info.get('year') + return int(year) if year is not None else None + elif season_type == 'ORDINAL': + ordinal = season_info.get('ordinal') + return int(ordinal) if ordinal is not None else None + + raise ValueError(f"Unknown season type '{season_type}'.") \ No newline at end of file