refactor(RTLP): improve code style and fix logic bugs

This commit is contained in:
2026-03-02 01:02:55 +01:00
parent 5bf50ecba4
commit d75533cb25

View File

@@ -1,30 +1,32 @@
from __future__ import annotations from __future__ import annotations
import base64
import hashlib
import json
import re
import secrets
from http.cookiejar import MozillaCookieJar from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union from typing import Any, Optional, Union
import click import click
import requests import requests
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.manifests import DASH
from devine.core.constants import AnyTrack from devine.core.constants import AnyTrack
from devine.core.service import Service
from devine.core.credential import Credential from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks, Track from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapters, Tracks
import re
import json
import base64
import hashlib
import secrets
class RTLP(Service): class RTLP(Service):
"""RTL+ (plus.rtl.de) streaming service."""
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase. # List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = () ALIASES = ()
# List of regions of which the service offers support for. # List of regions of which the service offers support for.
GEOFENCE = ("de", "at") GEOFENCE = ("de", "at", "ch")
TITLE_RE = r"^https?:\/\/plus\.rtl\.de\/video-tv\/(?P<kind>shows|serien|filme)\/(?:[^\/]+-)?(?P<show_id>\d+)(?:\/[^\/]+-)?(?P<season_id>\d+)?(?:\/[^\/]+-)?(?P<episode_id>\d+)?$" TITLE_RE = r"^https?:\/\/plus\.rtl\.de\/video-tv\/(?P<kind>shows|serien|filme)\/(?:[^\/]+-)?(?P<show_id>\d+)(?:\/[^\/]+-)?(?P<season_id>\d+)?(?:\/[^\/]+-)?(?P<episode_id>\d+)?$"
AUTH_CODE_REGEX = r"code=([\w-]+\.[\w-]+\.[\w-]+)" AUTH_CODE_REGEX = r"code=([\w-]+\.[\w-]+\.[\w-]+)"
@@ -38,17 +40,11 @@ class RTLP(Service):
def __init__(self, ctx: click.Context, title: str): def __init__(self, ctx: click.Context, title: str):
self.title = title self.title = title
super().__init__(ctx) super().__init__(ctx)
def get_session(self) -> requests.Session: def get_session(self) -> requests.Session:
# modify the creation of the requests session (stored as self.session)
# make a super() call to take the original result and further modify it,
# or don't to make a completely fresh one if required.
session = super().get_session() session = super().get_session()
# Set default headers as specified
session.headers.update({ session.headers.update({
'Accept': '*/*', 'Accept': '*/*',
'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
@@ -56,7 +52,7 @@ class RTLP(Service):
'Referer': 'https://plus.rtl.de/', 'Referer': 'https://plus.rtl.de/',
'Rtlplus-Client-Id': 'rci:rtlplus:web', 'Rtlplus-Client-Id': 'rci:rtlplus:web',
'Rtlplus-Client-Version': '2024.7.29.2', 'Rtlplus-Client-Version': '2024.7.29.2',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0',
}) })
return session return session
@@ -66,14 +62,21 @@ class RTLP(Service):
if not cookies: if not cookies:
self._authenticate_anonymous() self._authenticate_anonymous()
else: else:
self._authenticate_with_cookies(cookies) self._authenticate_with_cookies(cookies)
def generate_code_verifier(self): # ------------------------------------------------------------------
# Authentication helpers
# ------------------------------------------------------------------
@staticmethod
def _generate_code_verifier() -> str:
"""Generate a PKCE code verifier."""
return secrets.token_urlsafe(64) return secrets.token_urlsafe(64)
def generate_code_challenge(self, verifier): @staticmethod
def _generate_code_challenge(verifier: str) -> str:
"""Generate a PKCE code challenge from a verifier."""
sha256_hash = hashlib.sha256(verifier.encode()).digest() sha256_hash = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=") return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
@@ -81,21 +84,18 @@ class RTLP(Service):
auth_url = self.config["endpoints"]["auth_url"] auth_url = self.config["endpoints"]["auth_url"]
token_url = self.config["endpoints"]["token_url"] token_url = self.config["endpoints"]["token_url"]
code_verifier = self.generate_code_verifier() code_verifier = self._generate_code_verifier()
code_challenge = self.generate_code_challenge(code_verifier) code_challenge = self._generate_code_challenge(code_verifier)
redirect_url_request = self.session.get( redirect_url_request = self.session.get(
auth_url, auth_url,
headers={
**self.session.headers,
},
params={ params={
'client_id': 'rtlplus-web', 'client_id': 'rtlplus-web',
'redirect_uri': 'https://plus.rtl.de/silent-check-sso.html', 'redirect_uri': 'https://plus.rtl.de/silent-check-sso.html',
'response_type': 'code', 'response_type': 'code',
'scope': 'openid', 'scope': 'openid',
'code_challenge_method': 'S256', 'code_challenge_method': 'S256',
'code_challenge': code_challenge 'code_challenge': code_challenge,
}, },
cookies=cookies, cookies=cookies,
) )
@@ -104,32 +104,33 @@ class RTLP(Service):
redirect_url = redirect_url_request.url redirect_url = redirect_url_request.url
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url) auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
if auth_code_match: if not auth_code_match:
auth_code = auth_code_match.group(1) raise EnvironmentError("Authorization code not found in redirect URL.")
self.log.debug(f"Auth Code: {auth_code}")
else: auth_code = auth_code_match.group(1)
self.log.error("Authorization code not found in redirect URL.") self.log.debug(f"Auth Code: {auth_code}")
raise EnvironmentError("Could not find authorization code.")
response = self.session.post( response = self.session.post(
token_url, token_url,
headers={ headers={
**self.session.headers, 'Content-Type': 'application/x-www-form-urlencoded',
'Content-Type': 'application/x-www-form-urlencoded'
}, },
cookies=cookies, cookies=cookies,
data=bytes(f'grant_type=authorization_code&client_id=rtlplus-web&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html&code={auth_code}&code_verifier={code_verifier}', 'utf-8'), data=bytes(
f'grant_type=authorization_code&client_id=rtlplus-web'
f'&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html'
f'&code={auth_code}&code_verifier={code_verifier}',
'utf-8',
),
) )
response.raise_for_status() response.raise_for_status()
auth_response = response.json() auth_response = response.json()
if 'access_token' in auth_response: if 'access_token' not in auth_response:
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
else:
self.log.error("No access_token found in response.")
raise EnvironmentError("Cookie authentication failed: no access token in response.") raise EnvironmentError("Cookie authentication failed: no access token in response.")
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
def _authenticate_anonymous(self) -> None: def _authenticate_anonymous(self) -> None:
token_url = self.config["endpoints"]["token_url"] token_url = self.config["endpoints"]["token_url"]
@@ -137,322 +138,325 @@ class RTLP(Service):
response = self.session.post( response = self.session.post(
token_url, token_url,
headers={ headers={
**self.session.headers, 'Content-Type': 'application/x-www-form-urlencoded',
'Content-Type': 'application/x-www-form-urlencoded'
}, },
data=bytes('grant_type=client_credentials&client_id=anonymous-user&client_secret=4bfeb73f-1c4a-4e9f-a7fa-96aa1ad3d94c', 'utf-8'), data=bytes(
'grant_type=client_credentials&client_id=anonymous-user'
'&client_secret=4bfeb73f-1c4a-4e9f-a7fa-96aa1ad3d94c',
'utf-8',
),
) )
response.raise_for_status() response.raise_for_status()
auth_response = response.json() auth_response = response.json()
if 'access_token' not in auth_response:
if 'access_token' in auth_response:
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with RTL+ service successfully.")
else:
self.log.error("No access_token found in response.")
raise EnvironmentError("Anonymous authentication failed: no access token in response.") raise EnvironmentError("Anonymous authentication failed: no access token in response.")
# Required methods: self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with RTL+ service.")
# ------------------------------------------------------------------
# Validation helpers
# ------------------------------------------------------------------
@staticmethod
def _validate_required_fields(data: dict, fields: list[str], context: str) -> None:
"""Raise ValueError if any required fields are missing from data."""
missing = [f for f in fields if f not in data]
if missing:
raise ValueError(f"Missing required fields {missing} in {context}.")
# ------------------------------------------------------------------
# Required service methods
# ------------------------------------------------------------------
def get_titles(self) -> Union[Movies, Series]: def get_titles(self) -> Union[Movies, Series]:
graphql_url = self.config["endpoints"]["graphql_url"] match = re.match(self.TITLE_RE, self.title)
if not match:
raise ValueError(f"Could not parse ID from title — is the URL correct? ({self.title})")
try: kind = match.group("kind")
kind, show_id, season_id, episode_id = ( show_id = match.group("show_id")
re.match(self.TITLE_RE, self.title).group(i) for i in ("kind", "show_id", "season_id", "episode_id") season_id = match.group("season_id")
) episode_id = match.group("episode_id")
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
if not kind or not show_id: if not kind or not show_id:
self.log.error(f"Invalid title URL: {self.title}. 'kind' or 'show_id' is missing.") raise ValueError(f"Invalid title URL: 'kind' or 'show_id' is missing. ({self.title})")
raise ValueError("Invalid title URL: 'kind' or 'show_id' is missing.")
if kind == "filme": if kind == "filme":
content_id = f'rrn:watch:videohub:movie:{show_id}' return self._get_movie_titles(show_id)
elif kind in ("shows", "serien"):
return self._get_series_titles(show_id, season_id, episode_id)
response_data = self._execute_graphql_query('MovieDetail', {'id': content_id}, 'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4') raise ValueError(f"Unsupported content type: '{kind}'.")
if 'movie' in response_data: def _get_movie_titles(self, show_id: str) -> Movies:
movie_data = response_data['movie'] """Fetch and return movie title data."""
content_id = f'rrn:watch:videohub:movie:{show_id}'
if 'id' not in movie_data or 'title' not in movie_data or 'productionYear' not in movie_data: response_data = self._execute_graphql_query(
self.log.error("Invalid movie_data data received.") 'MovieDetail', {'id': content_id},
raise ValueError("Invalid movie_data data received from RTL+ service.") 'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4',
)
self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}") movie_data = response_data.get('movie')
if not movie_data:
raise ValueError("Failed to fetch movie data from RTL+ service.")
return Movies( self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data')
[
Movie(
id_=content_id,
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
]
)
self.log.error(f"Failed to fetch Movie data: {response.status_code} - {response.text}") self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}")
raise EnvironmentError("Failed to fetch Movie data from RTL+ service.")
if kind == "shows" or kind == "serien": return Movies([
if episode_id: Movie(
content_id = f'rrn:watch:videohub:episode:{episode_id}' id_=content_id,
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
])
response_data = self._execute_graphql_query('EpisodeDetail', {'episodeId': content_id}, '2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a') def _get_series_titles(self, show_id: str, season_id: Optional[str], episode_id: Optional[str]) -> Series:
"""Route to the appropriate series fetch method based on URL specificity."""
if episode_id:
return self._get_single_episode(episode_id)
elif season_id:
return self._get_single_season(season_id)
elif show_id:
return self._get_full_show(show_id)
if 'episode' in response_data: raise ValueError("No valid identifier found in series URL.")
episode_data = response_data['episode']
if 'id' not in episode_data or 'title' not in episode_data or 'number' not in episode_data or 'episodeSeason' not in episode_data: def _get_single_episode(self, episode_id: str) -> Series:
self.log.error("Invalid episode data received.") """Fetch a single episode by its ID."""
raise ValueError("Invalid episode data received from RTL+ service.") content_id = f'rrn:watch:videohub:episode:{episode_id}'
if 'format' not in episode_data and 'title' not in episode_data['format']: response_data = self._execute_graphql_query(
self.log.error("Invalid episode format received.") 'EpisodeDetail', {'episodeId': content_id},
raise ValueError("Invalid episode format received from RTL+ service.") '2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a',
)
return Series( episode_data = response_data.get('episode')
[ if not episode_data:
Episode( raise ValueError("Failed to fetch episode data from RTL+ service.")
id_=content_id,
service=self.__class__,
title=episode_data['format']['title'],
season=self.get_episode_session(episode_data),
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
]
)
elif season_id: self._validate_required_fields(episode_data, ['id', 'title', 'number', 'episodeSeason'], 'episode_data')
content_id = f'rrn:watch:videohub:season:{season_id}'
response_data = self._execute_graphql_query('SeasonWithFormatAndEpisodes', {'seasonId': content_id}, 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584') if 'format' not in episode_data or 'title' not in episode_data['format']:
raise ValueError("Missing 'format.title' in episode_data.")
if 'season' in response_data: return Series([
season_data = response_data['season'] Episode(
id_=content_id,
service=self.__class__,
title=episode_data['format']['title'],
season=self._get_episode_season(episode_data),
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
])
if 'format' not in season_data or 'title' not in season_data['format']: def _get_single_season(self, season_id: str) -> Series:
self.log.error("Invalid season format received.") """Fetch all episodes from a single season."""
raise ValueError("Invalid season format received from RTL+ service.") content_id = f'rrn:watch:videohub:season:{season_id}'
if not 'episodes' in season_data or not isinstance(season_data['episodes'], list): response_data = self._execute_graphql_query(
self.log.error("Invalid season data received.") 'SeasonWithFormatAndEpisodes', {'seasonId': content_id},
raise ValueError("Invalid season data received from RTL+ service.") 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584',
)
episodes = [] season_data = response_data.get('season')
for episode in season_data['episodes']: if not season_data:
if 'id' not in episode or 'title' not in episode or 'number' not in episode or 'episodeSeason' not in episode: raise ValueError("Failed to fetch season data from RTL+ service.")
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from RTL+ service.")
episodes.append( if 'format' not in season_data or 'title' not in season_data['format']:
Episode( raise ValueError("Missing 'format.title' in season_data.")
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self.get_episode_session(episode),
number=episode['number'],
name=episode['title'],
data=episode,
)
)
return Series( if 'episodes' not in season_data or not isinstance(season_data['episodes'], list):
episodes raise ValueError("Missing or invalid 'episodes' in season_data.")
)
elif show_id: episodes = []
content_id = f'rrn:watch:videohub:format:{show_id}' for episode in season_data['episodes']:
self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode')
response_data = self._execute_graphql_query('Format', {'id': content_id}, 'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa') episodes.append(Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self._get_episode_season(episode),
number=episode['number'],
name=episode['title'],
data=episode,
))
if 'format' in response_data: return Series(episodes)
format_data = response_data['format']
if 'title' not in format_data or 'id' not in format_data: def _get_full_show(self, show_id: str) -> Series:
self.log.error("Invalid format data received.") """Fetch all episodes across all seasons of a show/format."""
raise ValueError("Invalid format data received from RTL+ service.") content_id = f'rrn:watch:videohub:format:{show_id}'
if 'seasons' not in format_data or not isinstance(format_data['seasons'], list): response_data = self._execute_graphql_query(
self.log.error("Invalid format seasons data received.") 'Format', {'id': content_id},
raise ValueError("Invalid format seasons data received from RTL+ service.") 'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa',
)
episodes = [] format_data = response_data.get('format')
for season in format_data['seasons']: if not format_data:
if not 'id' in season or not 'seasonType' in season: raise ValueError("Failed to fetch format data from RTL+ service.")
self.log.error("Invalid season data received.")
raise ValueError("Invalid season data received from RTL+ service.")
season_id = season['id'] self._validate_required_fields(format_data, ['id', 'title', 'seasons'], 'format_data')
response_data = self._execute_graphql_query('SeasonWithFormatAndEpisodes', {'seasonId': season_id}, 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584') if not isinstance(format_data['seasons'], list):
raise ValueError("Invalid 'seasons' in format_data (expected list).")
if 'season' in response_data: episodes = []
season_data = response_data['season'] for season in format_data['seasons']:
self._validate_required_fields(season, ['id', 'seasonType'], 'season')
if 'format' not in season_data or 'title' not in season_data['format']: season_response = self._execute_graphql_query(
self.log.error("Invalid season format received.") 'SeasonWithFormatAndEpisodes', {'seasonId': season['id']},
raise ValueError("Invalid season format received from RTL+ service.") 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584',
)
if not 'episodes' in season_data or not isinstance(season_data['episodes'], list): season_data = season_response.get('season')
self.log.error("Invalid season data received.") if not season_data:
raise ValueError("Invalid season data received from RTL+ service.") continue
for episode in season_data['episodes']: if 'format' not in season_data or 'title' not in season_data['format']:
if 'id' not in episode or 'title' not in episode or 'number' not in episode or 'episodeSeason' not in episode: raise ValueError("Missing 'format.title' in season_data.")
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from RTL+ service.")
episodes.append( if 'episodes' not in season_data or not isinstance(season_data['episodes'], list):
Episode( raise ValueError("Missing or invalid 'episodes' in season_data.")
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self.get_episode_session(episode),
number=episode['number'],
name=episode['title'],
data=episode,
)
)
return Series( for episode in season_data['episodes']:
episodes self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode')
)
self.log.error(f"Failed to fetch series data: {response.status_code} - {response.text}") episodes.append(Episode(
raise EnvironmentError("Failed to fetch series data from RTL+ service.") id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self._get_episode_season(episode),
number=episode['number'],
name=episode['title'],
data=episode,
))
return Series(episodes)
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks: def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
playout_url = self.config["endpoints"]["playout"] if not isinstance(title, (Episode, Movie)):
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
if isinstance(title, Episode) or isinstance(title, Movie): playout_url = self.config["endpoints"]["playout"].format(id=title.data['id'])
playout_url = playout_url.format(id=title.data['id'])
else:
self.log.error(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
raise ValueError(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
response = self.session.get( response = self.session.get(
playout_url, playout_url,
headers={ headers={
**self.session.headers, 'x-auth-token': self._rtlp_auth_jwt,
'x-auth-token': self._rtlp_auth_jwt
}
)
if response and response.status_code == 200:
response_data = response.json()
all_parsed_tracks = [] # Use a list to collect all individual track objects
for variant in response_data:
if 'name' not in variant:
self.log.error("Invalid playout variant data received.")
raise ValueError("Invalid playout variant data received from RTL+ service.")
# Assuming 'dashsd' and 'dashhd' variants contain the MPD URLs
if variant['name'] == 'dashhd':
if 'sources' not in variant and len(variant['sources']) == 0:
self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.")
continue
source_entry = variant['sources'][0]
# Assuming the 'url' key in each source_entry is the DASH manifest URL
if not 'url' in source_entry:
self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.")
continue
manifest_url = source_entry['url']
try:
all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de") # Use title's language for filtering/tagging
except Exception as e:
self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}")
# Decide if you want to raise or just log and continue for other manifests
continue
# Return a new Tracks object containing all collected tracks
return Tracks(all_parsed_tracks)
else:
self.log.error(f"Failed to fetch tracks data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch tracks data from RTL+ service.")
def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]:
# technically optional, but you must define and at least `return []`.
return Chapters()
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Optional[Union[bytes, str]]:
license_url = self.config["endpoints"]["license"]
response = self.session.post(
license_url,
headers={
**self.session.headers,
'Content-Type': 'application/octet-stream',
'x-auth-token': self._rtlp_auth_jwt
}, },
data=challenge
)
if response.status_code != 200:
self.log.error(f"Failed to fetch license: {response.status_code} - {response.text}")
raise ConnectionError(response.text)
self.log.info("Successfully fetched Widevine license from RTL+ service.")
return response.content
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
**self.session.headers,
'Authorization': f'Bearer {self._rtlp_auth_jwt}'
},
params={
'operationName': operation_name,
'variables': json.dumps(variables).encode(),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}
}).encode()
}
) )
response.raise_for_status() response.raise_for_status()
response_data = response.json() response_data = response.json()
all_parsed_tracks = []
for variant in response_data:
if 'name' not in variant:
raise ValueError("Invalid playout variant data: missing 'name'.")
if variant['name'] != 'dashhd':
continue
if 'sources' not in variant or len(variant['sources']) == 0:
self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.")
continue
source_entry = variant['sources'][0]
if 'url' not in source_entry:
self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.")
continue
manifest_url = source_entry['url']
try:
all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
except Exception as e:
self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}")
continue
return Tracks(all_parsed_tracks)
def get_chapters(self, title: Union[Movies, Series]) -> list:
return Chapters()
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(
self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack,
) -> Optional[Union[bytes, str]]:
"""Obtain a Widevine license for the given track."""
license_url = self.config["endpoints"]["license"]
response = self.session.post(
license_url,
headers={
'Content-Type': 'application/octet-stream',
'x-auth-token': self._rtlp_auth_jwt,
},
data=challenge,
)
response.raise_for_status()
self.log.info("Successfully fetched Widevine license.")
return response.content
# ------------------------------------------------------------------
# GraphQL & utility helpers
# ------------------------------------------------------------------
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
"""Execute a persisted GraphQL query and return the 'data' payload."""
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
'Authorization': f'Bearer {self._rtlp_auth_jwt}',
},
params={
'operationName': operation_name,
'variables': json.dumps(variables).encode(),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash},
}).encode(),
},
)
response.raise_for_status()
response_data = response.json()
if 'data' not in response_data: if 'data' not in response_data:
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.") raise ValueError(f"Invalid GraphQL response for '{operation_name}': missing 'data' field.")
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")
return response_data['data'] return response_data['data']
def get_episode_session(self, episode) -> int: @staticmethod
season_value = None def _get_episode_season(episode: dict) -> Optional[int]:
if 'seasonType' not in episode['episodeSeason']: """Extract the season number (ordinal or year) from episode data."""
self.log.error("Invalid episode season received.") episode_season = episode.get('episodeSeason', {})
raise ValueError("Invalid episode season received from RTL+ service.") season_type = episode_season.get('seasonType')
seasonType = episode['episodeSeason']['seasonType'] if not season_type:
raise ValueError("Missing 'seasonType' in episode season data.")
if seasonType == 'ANNUAL': season_info = episode_season.get('season', {})
if 'season' in episode['episodeSeason'] and 'year' in episode['episodeSeason']['season']:
season_value = int(episode['episodeSeason']['season']['year'])
elif seasonType == 'ORDINAL':
if 'season' in episode['episodeSeason'] and 'ordinal' in episode['episodeSeason']['season']:
season_value = int(episode['episodeSeason']['season']['ordinal'])
else:
self.log.error(f"Unknown season type '{seasonType}' received.")
raise ValueError(f"Unknown season type '{seasonType}' received from RTL+ service.")
return season_value if season_type == 'ANNUAL':
year = season_info.get('year')
return int(year) if year is not None else None
elif season_type == 'ORDINAL':
ordinal = season_info.get('ordinal')
return int(ordinal) if ordinal is not None else None
raise ValueError(f"Unknown season type '{season_type}'.")