Files
devine_services/RTLP/__init__.py
blackicedbear ffbd6894c2 feat(RTLP): add premium bypass option to download premium-only content
By default, content with tier 'PREMIUM' is skipped. Use --premium-bypass
flag to download premium movies and episodes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 12:22:24 +01:00

490 lines
19 KiB
Python

from __future__ import annotations
import base64
import hashlib
import json
import re
import secrets
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
import requests
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapters, Tracks
class RTLP(Service):
"""RTL+ (plus.rtl.de) streaming service."""
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = ()
# List of regions of which the service offers support for.
GEOFENCE = ("de", "at", "ch")
TITLE_RE = r"^https?:\/\/plus\.rtl\.de\/video-tv\/(?P<kind>shows|serien|filme)\/(?:[^\/]+-)?(?P<show_id>\d+)(?:\/[^\/]+-)?(?P<season_id>\d+)?(?:\/[^\/]+-)?(?P<episode_id>\d+)?$"
AUTH_CODE_REGEX = r"code=([\w-]+\.[\w-]+\.[\w-]+)"
@staticmethod
@click.command(name="RTLP", short_help="https://plus.rtl.de", help=__doc__)
@click.argument("title", type=str)
@click.option("--premium-bypass", is_flag=True, default=False, help="Download premium-only content (default: skip premium content).")
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> RTLP:
return RTLP(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, premium_bypass: bool = False):
self.title = title
self.premium_bypass = premium_bypass
super().__init__(ctx)
def get_session(self) -> requests.Session:
session = super().get_session()
session.headers.update({
'Accept': '*/*',
'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Origin': 'https://plus.rtl.de',
'Referer': 'https://plus.rtl.de/',
'Rtlplus-Client-Id': 'rci:rtlplus:web',
'Rtlplus-Client-Version': '2024.7.29.2',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0',
})
return session
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential) # important
if not cookies:
self._authenticate_anonymous()
else:
self._authenticate_with_cookies(cookies)
# ------------------------------------------------------------------
# Authentication helpers
# ------------------------------------------------------------------
@staticmethod
def _generate_code_verifier() -> str:
"""Generate a PKCE code verifier."""
return secrets.token_urlsafe(64)
@staticmethod
def _generate_code_challenge(verifier: str) -> str:
"""Generate a PKCE code challenge from a verifier."""
sha256_hash = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
def _authenticate_with_cookies(self, cookies: MozillaCookieJar) -> None:
auth_url = self.config["endpoints"]["auth_url"]
token_url = self.config["endpoints"]["token_url"]
code_verifier = self._generate_code_verifier()
code_challenge = self._generate_code_challenge(code_verifier)
redirect_url_request = self.session.get(
auth_url,
params={
'client_id': 'rtlplus-web',
'redirect_uri': 'https://plus.rtl.de/silent-check-sso.html',
'response_type': 'code',
'scope': 'openid',
'code_challenge_method': 'S256',
'code_challenge': code_challenge,
},
cookies=cookies,
)
redirect_url_request.raise_for_status()
redirect_url = redirect_url_request.url
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
if not auth_code_match:
raise RuntimeError("Authorization code not found in redirect URL.")
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
response = self.session.post(
token_url,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
},
cookies=cookies,
data=bytes(
f'grant_type=authorization_code&client_id=rtlplus-web'
f'&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html'
f'&code={auth_code}&code_verifier={code_verifier}',
'utf-8',
),
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' not in auth_response:
raise RuntimeError("Cookie authentication failed: no access token in response.")
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
def _authenticate_anonymous(self) -> None:
token_url = self.config["endpoints"]["token_url"]
client_secret = self.config["client"]["secret"]
response = self.session.post(
token_url,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
},
data=bytes(
f'grant_type=client_credentials&client_id={self.config["client"]["id"]}'
f'&client_secret={client_secret}',
'utf-8',
),
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' not in auth_response:
raise RuntimeError("Anonymous authentication failed: no access token in response.")
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with RTL+ service.")
# ------------------------------------------------------------------
# Validation helpers
# ------------------------------------------------------------------
@staticmethod
def _validate_required_fields(data: dict, fields: list[str], context: str) -> None:
"""Raise ValueError if any required fields are missing from data."""
missing = [f for f in fields if f not in data]
if missing:
raise ValueError(f"Missing required fields {missing} in {context}.")
# ------------------------------------------------------------------
# Required service methods
# ------------------------------------------------------------------
def get_titles(self) -> Union[Movies, Series]:
match = re.match(self.TITLE_RE, self.title)
if not match:
raise ValueError(f"Could not parse ID from title — is the URL correct? ({self.title})")
kind = match.group("kind")
show_id = match.group("show_id")
season_id = match.group("season_id")
episode_id = match.group("episode_id")
if not kind or not show_id:
raise ValueError(f"Invalid title URL: 'kind' or 'show_id' is missing. ({self.title})")
if kind == "filme":
return self._get_movie_titles(show_id)
elif kind in ("shows", "serien"):
return self._get_series_titles(show_id, season_id, episode_id)
raise ValueError(f"Unsupported content type: '{kind}'.")
def _get_movie_titles(self, show_id: str) -> Movies:
"""Fetch and return movie title data."""
content_id = f'rrn:watch:videohub:movie:{show_id}'
response_data = self._execute_graphql_query(
'MovieDetail', {'id': content_id},
'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4',
)
movie_data = response_data.get('movie')
if not movie_data:
raise ValueError("Failed to fetch movie data from RTL+ service.")
self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data')
self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}")
if self._is_premium(movie_data) and not self.premium_bypass:
self.log.info(f"Skipping premium movie: {movie_data['title']} (use --premium-bypass to download)")
return Movies([])
return Movies([
Movie(
id_=content_id,
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
])
def _get_series_titles(self, show_id: str, season_id: Optional[str], episode_id: Optional[str]) -> Series:
"""Route to the appropriate series fetch method based on URL specificity."""
if episode_id:
return self._get_single_episode(episode_id)
elif season_id:
return self._get_single_season(season_id)
elif show_id:
return self._get_full_show(show_id)
raise ValueError("No valid identifier found in series URL.")
def _get_single_episode(self, episode_id: str) -> Series:
"""Fetch a single episode by its ID."""
content_id = f'rrn:watch:videohub:episode:{episode_id}'
response_data = self._execute_graphql_query(
'EpisodeDetail', {'episodeId': content_id},
'2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a',
)
episode_data = response_data.get('episode')
if not episode_data:
raise ValueError("Failed to fetch episode data from RTL+ service.")
self._validate_required_fields(episode_data, ['id', 'title', 'number', 'episodeSeason'], 'episode_data')
if 'format' not in episode_data or 'title' not in episode_data['format']:
raise ValueError("Missing 'format.title' in episode_data.")
if self._is_premium(episode_data) and not self.premium_bypass:
self.log.info(f"Skipping premium episode: {episode_data['title']} (use --premium-bypass to download)")
return Series([])
return Series([
Episode(
id_=content_id,
service=self.__class__,
title=episode_data['format']['title'],
season=self._get_episode_season(episode_data),
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
])
def _get_single_season(self, season_id: str) -> Series:
"""Fetch all episodes from a single season."""
content_id = f'rrn:watch:videohub:season:{season_id}'
response_data = self._execute_graphql_query(
'SeasonWithFormatAndEpisodes', {'seasonId': content_id},
'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584',
)
season_data = response_data.get('season')
if not season_data:
raise ValueError("Failed to fetch season data from RTL+ service.")
if 'format' not in season_data or 'title' not in season_data['format']:
raise ValueError("Missing 'format.title' in season_data.")
if 'episodes' not in season_data or not isinstance(season_data['episodes'], list):
raise ValueError("Missing or invalid 'episodes' in season_data.")
episodes = []
for episode in season_data['episodes']:
self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode')
if self._is_premium(episode) and not self.premium_bypass:
self.log.info(f"Skipping premium episode: {episode['title']} (use --premium-bypass to download)")
continue
episodes.append(Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self._get_episode_season(episode),
number=episode['number'],
name=episode['title'],
data=episode,
))
return Series(episodes)
def _get_full_show(self, show_id: str) -> Series:
"""Fetch all episodes across all seasons of a show/format."""
content_id = f'rrn:watch:videohub:format:{show_id}'
response_data = self._execute_graphql_query(
'Format', {'id': content_id},
'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa',
)
format_data = response_data.get('format')
if not format_data:
raise ValueError("Failed to fetch format data from RTL+ service.")
self._validate_required_fields(format_data, ['id', 'title', 'seasons'], 'format_data')
if not isinstance(format_data['seasons'], list):
raise ValueError("Invalid 'seasons' in format_data (expected list).")
episodes = []
for season in format_data['seasons']:
self._validate_required_fields(season, ['id', 'seasonType'], 'season')
season_response = self._execute_graphql_query(
'SeasonWithFormatAndEpisodes', {'seasonId': season['id']},
'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584',
)
season_data = season_response.get('season')
if not season_data:
continue
if 'format' not in season_data or 'title' not in season_data['format']:
raise ValueError("Missing 'format.title' in season_data.")
if 'episodes' not in season_data or not isinstance(season_data['episodes'], list):
raise ValueError("Missing or invalid 'episodes' in season_data.")
for episode in season_data['episodes']:
self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode')
if self._is_premium(episode) and not self.premium_bypass:
self.log.info(f"Skipping premium episode: {episode['title']} (use --premium-bypass to download)")
continue
episodes.append(Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self._get_episode_season(episode),
number=episode['number'],
name=episode['title'],
data=episode,
))
return Series(episodes)
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
if not isinstance(title, (Episode, Movie)):
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
playout_url = self.config["endpoints"]["playout"].format(id=title.data['id'])
response = self.session.get(
playout_url,
headers={
'x-auth-token': self._rtlp_auth_jwt,
},
)
response.raise_for_status()
response_data = response.json()
all_parsed_tracks = []
for variant in response_data:
if 'name' not in variant:
raise ValueError("Invalid playout variant data: missing 'name'.")
if variant['name'] != 'dashhd':
continue
if 'sources' not in variant or len(variant['sources']) == 0:
self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.")
continue
source_entry = variant['sources'][0]
if 'url' not in source_entry:
self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.")
continue
manifest_url = source_entry['url']
try:
all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
except Exception as e:
self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}")
continue
return Tracks(all_parsed_tracks)
def get_chapters(self, title: Union[Movies, Series]) -> list:
return Chapters()
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(
self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack,
) -> Optional[Union[bytes, str]]:
"""Obtain a Widevine license for the given track."""
license_url = self.config["endpoints"]["license"]
response = self.session.post(
license_url,
headers={
'Content-Type': 'application/octet-stream',
'x-auth-token': self._rtlp_auth_jwt,
},
data=challenge,
)
response.raise_for_status()
self.log.info("Successfully fetched Widevine license.")
return response.content
# ------------------------------------------------------------------
# GraphQL & utility helpers
# ------------------------------------------------------------------
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
"""Execute a persisted GraphQL query and return the 'data' payload."""
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
'Authorization': f'Bearer {self._rtlp_auth_jwt}',
},
params={
'operationName': operation_name,
'variables': json.dumps(variables),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash},
}).encode(),
},
)
response.raise_for_status()
response_data = response.json()
if response_data.get("errors"):
raise ValueError(f"GraphQL errors for '{operation_name}': {response_data['errors']}")
if 'data' not in response_data:
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")
return response_data['data']
def _is_premium(self, data: dict) -> bool:
"""Check if content is premium-only (skipped unless premium_bypass is enabled)."""
return data.get('tier') == 'PREMIUM'
@staticmethod
def _get_episode_season(episode: dict) -> Optional[int]:
"""Extract the season number (ordinal or year) from episode data."""
episode_season = episode.get('episodeSeason', {})
season_type = episode_season.get('seasonType')
if not season_type:
raise ValueError("Missing 'seasonType' in episode season data.")
season_info = episode_season.get('season', {})
if season_type == 'ANNUAL':
year = season_info.get('year')
return int(year) if year is not None else None
elif season_type == 'ORDINAL':
ordinal = season_info.get('ordinal')
return int(ordinal) if ordinal is not None else None
raise ValueError(f"Unknown season type '{season_type}'.")