By default, content with tier 'PREMIUM' is skipped. Use --premium-bypass flag to download premium movies and episodes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
490 lines
19 KiB
Python
490 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import secrets
|
|
from http.cookiejar import MozillaCookieJar
|
|
from typing import Any, Optional, Union
|
|
|
|
import click
|
|
import requests
|
|
|
|
from devine.core.constants import AnyTrack
|
|
from devine.core.credential import Credential
|
|
from devine.core.manifests import DASH
|
|
from devine.core.service import Service
|
|
from devine.core.titles import Episode, Movie, Movies, Series
|
|
from devine.core.tracks import Chapters, Tracks
|
|
|
|
|
|
class RTLP(Service):
|
|
"""RTL+ (plus.rtl.de) streaming service."""
|
|
|
|
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
|
|
ALIASES = ()
|
|
|
|
# List of regions of which the service offers support for.
|
|
GEOFENCE = ("de", "at", "ch")
|
|
|
|
TITLE_RE = r"^https?:\/\/plus\.rtl\.de\/video-tv\/(?P<kind>shows|serien|filme)\/(?:[^\/]+-)?(?P<show_id>\d+)(?:\/[^\/]+-)?(?P<season_id>\d+)?(?:\/[^\/]+-)?(?P<episode_id>\d+)?$"
|
|
AUTH_CODE_REGEX = r"code=([\w-]+\.[\w-]+\.[\w-]+)"
|
|
|
|
@staticmethod
|
|
@click.command(name="RTLP", short_help="https://plus.rtl.de", help=__doc__)
|
|
@click.argument("title", type=str)
|
|
@click.option("--premium-bypass", is_flag=True, default=False, help="Download premium-only content (default: skip premium content).")
|
|
@click.pass_context
|
|
def cli(ctx: click.Context, **kwargs: Any) -> RTLP:
|
|
return RTLP(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx: click.Context, title: str, premium_bypass: bool = False):
|
|
self.title = title
|
|
self.premium_bypass = premium_bypass
|
|
super().__init__(ctx)
|
|
|
|
def get_session(self) -> requests.Session:
|
|
session = super().get_session()
|
|
|
|
session.headers.update({
|
|
'Accept': '*/*',
|
|
'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
|
|
'Origin': 'https://plus.rtl.de',
|
|
'Referer': 'https://plus.rtl.de/',
|
|
'Rtlplus-Client-Id': 'rci:rtlplus:web',
|
|
'Rtlplus-Client-Version': '2024.7.29.2',
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0',
|
|
})
|
|
|
|
return session
|
|
|
|
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
|
|
super().authenticate(cookies, credential) # important
|
|
|
|
if not cookies:
|
|
self._authenticate_anonymous()
|
|
else:
|
|
self._authenticate_with_cookies(cookies)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Authentication helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _generate_code_verifier() -> str:
|
|
"""Generate a PKCE code verifier."""
|
|
return secrets.token_urlsafe(64)
|
|
|
|
@staticmethod
|
|
def _generate_code_challenge(verifier: str) -> str:
|
|
"""Generate a PKCE code challenge from a verifier."""
|
|
sha256_hash = hashlib.sha256(verifier.encode()).digest()
|
|
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
|
|
|
|
def _authenticate_with_cookies(self, cookies: MozillaCookieJar) -> None:
|
|
auth_url = self.config["endpoints"]["auth_url"]
|
|
token_url = self.config["endpoints"]["token_url"]
|
|
|
|
code_verifier = self._generate_code_verifier()
|
|
code_challenge = self._generate_code_challenge(code_verifier)
|
|
|
|
redirect_url_request = self.session.get(
|
|
auth_url,
|
|
params={
|
|
'client_id': 'rtlplus-web',
|
|
'redirect_uri': 'https://plus.rtl.de/silent-check-sso.html',
|
|
'response_type': 'code',
|
|
'scope': 'openid',
|
|
'code_challenge_method': 'S256',
|
|
'code_challenge': code_challenge,
|
|
},
|
|
cookies=cookies,
|
|
)
|
|
redirect_url_request.raise_for_status()
|
|
|
|
redirect_url = redirect_url_request.url
|
|
|
|
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
|
|
if not auth_code_match:
|
|
raise RuntimeError("Authorization code not found in redirect URL.")
|
|
|
|
auth_code = auth_code_match.group(1)
|
|
self.log.debug(f"Auth Code: {auth_code}")
|
|
|
|
response = self.session.post(
|
|
token_url,
|
|
headers={
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
},
|
|
cookies=cookies,
|
|
data=bytes(
|
|
f'grant_type=authorization_code&client_id=rtlplus-web'
|
|
f'&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html'
|
|
f'&code={auth_code}&code_verifier={code_verifier}',
|
|
'utf-8',
|
|
),
|
|
)
|
|
response.raise_for_status()
|
|
|
|
auth_response = response.json()
|
|
if 'access_token' not in auth_response:
|
|
raise RuntimeError("Cookie authentication failed: no access token in response.")
|
|
|
|
self._rtlp_auth_jwt = auth_response['access_token']
|
|
self.log.info("Successfully authenticated with cookies.")
|
|
|
|
def _authenticate_anonymous(self) -> None:
|
|
token_url = self.config["endpoints"]["token_url"]
|
|
client_secret = self.config["client"]["secret"]
|
|
|
|
response = self.session.post(
|
|
token_url,
|
|
headers={
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
},
|
|
data=bytes(
|
|
f'grant_type=client_credentials&client_id={self.config["client"]["id"]}'
|
|
f'&client_secret={client_secret}',
|
|
'utf-8',
|
|
),
|
|
)
|
|
response.raise_for_status()
|
|
|
|
auth_response = response.json()
|
|
if 'access_token' not in auth_response:
|
|
raise RuntimeError("Anonymous authentication failed: no access token in response.")
|
|
|
|
self._rtlp_auth_jwt = auth_response['access_token']
|
|
self.log.info("Authenticated anonymously with RTL+ service.")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Validation helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _validate_required_fields(data: dict, fields: list[str], context: str) -> None:
|
|
"""Raise ValueError if any required fields are missing from data."""
|
|
missing = [f for f in fields if f not in data]
|
|
if missing:
|
|
raise ValueError(f"Missing required fields {missing} in {context}.")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Required service methods
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_titles(self) -> Union[Movies, Series]:
|
|
match = re.match(self.TITLE_RE, self.title)
|
|
if not match:
|
|
raise ValueError(f"Could not parse ID from title — is the URL correct? ({self.title})")
|
|
|
|
kind = match.group("kind")
|
|
show_id = match.group("show_id")
|
|
season_id = match.group("season_id")
|
|
episode_id = match.group("episode_id")
|
|
|
|
if not kind or not show_id:
|
|
raise ValueError(f"Invalid title URL: 'kind' or 'show_id' is missing. ({self.title})")
|
|
|
|
if kind == "filme":
|
|
return self._get_movie_titles(show_id)
|
|
elif kind in ("shows", "serien"):
|
|
return self._get_series_titles(show_id, season_id, episode_id)
|
|
|
|
raise ValueError(f"Unsupported content type: '{kind}'.")
|
|
|
|
def _get_movie_titles(self, show_id: str) -> Movies:
|
|
"""Fetch and return movie title data."""
|
|
content_id = f'rrn:watch:videohub:movie:{show_id}'
|
|
|
|
response_data = self._execute_graphql_query(
|
|
'MovieDetail', {'id': content_id},
|
|
'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4',
|
|
)
|
|
|
|
movie_data = response_data.get('movie')
|
|
if not movie_data:
|
|
raise ValueError("Failed to fetch movie data from RTL+ service.")
|
|
|
|
self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data')
|
|
|
|
self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}")
|
|
|
|
if self._is_premium(movie_data) and not self.premium_bypass:
|
|
self.log.info(f"Skipping premium movie: {movie_data['title']} (use --premium-bypass to download)")
|
|
return Movies([])
|
|
|
|
return Movies([
|
|
Movie(
|
|
id_=content_id,
|
|
service=self.__class__,
|
|
name=movie_data['title'],
|
|
data=movie_data,
|
|
year=movie_data['productionYear'],
|
|
)
|
|
])
|
|
|
|
def _get_series_titles(self, show_id: str, season_id: Optional[str], episode_id: Optional[str]) -> Series:
|
|
"""Route to the appropriate series fetch method based on URL specificity."""
|
|
if episode_id:
|
|
return self._get_single_episode(episode_id)
|
|
elif season_id:
|
|
return self._get_single_season(season_id)
|
|
elif show_id:
|
|
return self._get_full_show(show_id)
|
|
|
|
raise ValueError("No valid identifier found in series URL.")
|
|
|
|
def _get_single_episode(self, episode_id: str) -> Series:
|
|
"""Fetch a single episode by its ID."""
|
|
content_id = f'rrn:watch:videohub:episode:{episode_id}'
|
|
|
|
response_data = self._execute_graphql_query(
|
|
'EpisodeDetail', {'episodeId': content_id},
|
|
'2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a',
|
|
)
|
|
|
|
episode_data = response_data.get('episode')
|
|
if not episode_data:
|
|
raise ValueError("Failed to fetch episode data from RTL+ service.")
|
|
|
|
self._validate_required_fields(episode_data, ['id', 'title', 'number', 'episodeSeason'], 'episode_data')
|
|
|
|
if 'format' not in episode_data or 'title' not in episode_data['format']:
|
|
raise ValueError("Missing 'format.title' in episode_data.")
|
|
|
|
if self._is_premium(episode_data) and not self.premium_bypass:
|
|
self.log.info(f"Skipping premium episode: {episode_data['title']} (use --premium-bypass to download)")
|
|
return Series([])
|
|
|
|
return Series([
|
|
Episode(
|
|
id_=content_id,
|
|
service=self.__class__,
|
|
title=episode_data['format']['title'],
|
|
season=self._get_episode_season(episode_data),
|
|
number=episode_data['number'],
|
|
name=episode_data['title'],
|
|
data=episode_data,
|
|
)
|
|
])
|
|
|
|
def _get_single_season(self, season_id: str) -> Series:
|
|
"""Fetch all episodes from a single season."""
|
|
content_id = f'rrn:watch:videohub:season:{season_id}'
|
|
|
|
response_data = self._execute_graphql_query(
|
|
'SeasonWithFormatAndEpisodes', {'seasonId': content_id},
|
|
'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584',
|
|
)
|
|
|
|
season_data = response_data.get('season')
|
|
if not season_data:
|
|
raise ValueError("Failed to fetch season data from RTL+ service.")
|
|
|
|
if 'format' not in season_data or 'title' not in season_data['format']:
|
|
raise ValueError("Missing 'format.title' in season_data.")
|
|
|
|
if 'episodes' not in season_data or not isinstance(season_data['episodes'], list):
|
|
raise ValueError("Missing or invalid 'episodes' in season_data.")
|
|
|
|
episodes = []
|
|
for episode in season_data['episodes']:
|
|
self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode')
|
|
|
|
if self._is_premium(episode) and not self.premium_bypass:
|
|
self.log.info(f"Skipping premium episode: {episode['title']} (use --premium-bypass to download)")
|
|
continue
|
|
|
|
episodes.append(Episode(
|
|
id_=episode['id'],
|
|
service=self.__class__,
|
|
title=season_data['format']['title'],
|
|
season=self._get_episode_season(episode),
|
|
number=episode['number'],
|
|
name=episode['title'],
|
|
data=episode,
|
|
))
|
|
|
|
return Series(episodes)
|
|
|
|
def _get_full_show(self, show_id: str) -> Series:
|
|
"""Fetch all episodes across all seasons of a show/format."""
|
|
content_id = f'rrn:watch:videohub:format:{show_id}'
|
|
|
|
response_data = self._execute_graphql_query(
|
|
'Format', {'id': content_id},
|
|
'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa',
|
|
)
|
|
|
|
format_data = response_data.get('format')
|
|
if not format_data:
|
|
raise ValueError("Failed to fetch format data from RTL+ service.")
|
|
|
|
self._validate_required_fields(format_data, ['id', 'title', 'seasons'], 'format_data')
|
|
|
|
if not isinstance(format_data['seasons'], list):
|
|
raise ValueError("Invalid 'seasons' in format_data (expected list).")
|
|
|
|
episodes = []
|
|
for season in format_data['seasons']:
|
|
self._validate_required_fields(season, ['id', 'seasonType'], 'season')
|
|
|
|
season_response = self._execute_graphql_query(
|
|
'SeasonWithFormatAndEpisodes', {'seasonId': season['id']},
|
|
'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584',
|
|
)
|
|
|
|
season_data = season_response.get('season')
|
|
if not season_data:
|
|
continue
|
|
|
|
if 'format' not in season_data or 'title' not in season_data['format']:
|
|
raise ValueError("Missing 'format.title' in season_data.")
|
|
|
|
if 'episodes' not in season_data or not isinstance(season_data['episodes'], list):
|
|
raise ValueError("Missing or invalid 'episodes' in season_data.")
|
|
|
|
for episode in season_data['episodes']:
|
|
self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode')
|
|
|
|
if self._is_premium(episode) and not self.premium_bypass:
|
|
self.log.info(f"Skipping premium episode: {episode['title']} (use --premium-bypass to download)")
|
|
continue
|
|
|
|
episodes.append(Episode(
|
|
id_=episode['id'],
|
|
service=self.__class__,
|
|
title=season_data['format']['title'],
|
|
season=self._get_episode_season(episode),
|
|
number=episode['number'],
|
|
name=episode['title'],
|
|
data=episode,
|
|
))
|
|
|
|
return Series(episodes)
|
|
|
|
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
|
|
if not isinstance(title, (Episode, Movie)):
|
|
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
|
|
|
|
playout_url = self.config["endpoints"]["playout"].format(id=title.data['id'])
|
|
|
|
response = self.session.get(
|
|
playout_url,
|
|
headers={
|
|
'x-auth-token': self._rtlp_auth_jwt,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
response_data = response.json()
|
|
|
|
all_parsed_tracks = []
|
|
for variant in response_data:
|
|
if 'name' not in variant:
|
|
raise ValueError("Invalid playout variant data: missing 'name'.")
|
|
|
|
if variant['name'] != 'dashhd':
|
|
continue
|
|
|
|
if 'sources' not in variant or len(variant['sources']) == 0:
|
|
self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.")
|
|
continue
|
|
|
|
source_entry = variant['sources'][0]
|
|
|
|
if 'url' not in source_entry:
|
|
self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.")
|
|
continue
|
|
|
|
manifest_url = source_entry['url']
|
|
|
|
try:
|
|
all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
|
|
except Exception as e:
|
|
self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}")
|
|
continue
|
|
|
|
return Tracks(all_parsed_tracks)
|
|
|
|
def get_chapters(self, title: Union[Movies, Series]) -> list:
|
|
return Chapters()
|
|
|
|
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
|
|
return None
|
|
|
|
def get_widevine_license(
|
|
self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack,
|
|
) -> Optional[Union[bytes, str]]:
|
|
"""Obtain a Widevine license for the given track."""
|
|
license_url = self.config["endpoints"]["license"]
|
|
|
|
response = self.session.post(
|
|
license_url,
|
|
headers={
|
|
'Content-Type': 'application/octet-stream',
|
|
'x-auth-token': self._rtlp_auth_jwt,
|
|
},
|
|
data=challenge,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
self.log.info("Successfully fetched Widevine license.")
|
|
return response.content
|
|
|
|
# ------------------------------------------------------------------
|
|
# GraphQL & utility helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
|
|
"""Execute a persisted GraphQL query and return the 'data' payload."""
|
|
response = self.session.get(
|
|
self.config["endpoints"]["graphql_url"],
|
|
headers={
|
|
'Authorization': f'Bearer {self._rtlp_auth_jwt}',
|
|
},
|
|
params={
|
|
'operationName': operation_name,
|
|
'variables': json.dumps(variables),
|
|
'extensions': json.dumps({
|
|
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash},
|
|
}).encode(),
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
response_data = response.json()
|
|
|
|
if response_data.get("errors"):
|
|
raise ValueError(f"GraphQL errors for '{operation_name}': {response_data['errors']}")
|
|
|
|
if 'data' not in response_data:
|
|
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
|
|
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")
|
|
|
|
return response_data['data']
|
|
|
|
def _is_premium(self, data: dict) -> bool:
|
|
"""Check if content is premium-only (skipped unless premium_bypass is enabled)."""
|
|
return data.get('tier') == 'PREMIUM'
|
|
|
|
@staticmethod
|
|
def _get_episode_season(episode: dict) -> Optional[int]:
|
|
"""Extract the season number (ordinal or year) from episode data."""
|
|
episode_season = episode.get('episodeSeason', {})
|
|
season_type = episode_season.get('seasonType')
|
|
|
|
if not season_type:
|
|
raise ValueError("Missing 'seasonType' in episode season data.")
|
|
|
|
season_info = episode_season.get('season', {})
|
|
|
|
if season_type == 'ANNUAL':
|
|
year = season_info.get('year')
|
|
return int(year) if year is not None else None
|
|
elif season_type == 'ORDINAL':
|
|
ordinal = season_info.get('ordinal')
|
|
return int(ordinal) if ordinal is not None else None
|
|
|
|
raise ValueError(f"Unknown season type '{season_type}'.") |