Compare commits

...

7 Commits

Author SHA1 Message Date
6c31323806 refactor(JOYNDE): improve code style and best practices 2026-03-10 12:25:05 +01:00
ffbd6894c2 feat(RTLP): add premium bypass option to download premium-only content
By default, content with tier 'PREMIUM' is skipped. Use --premium-bypass
flag to download premium movies and episodes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 12:22:24 +01:00
451969150e fix(JOYNAT): fix GraphQL variables encoding 2026-03-02 09:48:42 +01:00
e950d53fe1 fix(RTLP): move client secret to config, use RuntimeError, and fix GraphQL variables encoding 2026-03-02 01:14:44 +01:00
d75533cb25 refactor(RTLP): improve code style and fix logic bugs 2026-03-02 01:02:55 +01:00
5bf50ecba4 refactor(JOYNAT): improve code style and best practices 2026-03-02 00:57:21 +01:00
49a14161cf feat(JOYNAT): add age bypass option 2026-03-02 00:41:11 +01:00
4 changed files with 763 additions and 686 deletions

View File

@@ -1,32 +1,31 @@
from __future__ import annotations
import base64
import hashlib
import json
import re
import secrets
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
import requests
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.manifests import DASH
from devine.core.constants import AnyTrack
from devine.core.service import Service
from devine.core.titles import Movies, Series
from devine.core.tracks import Chapter, Tracks
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks, Track
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Chapters, Tracks
import re
import json
import base64
import hashlib
import secrets
class JOYNAT(Service):
"""Joyn Austria (joyn.at) streaming service."""
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = ()
# List of regions of which the service offers support for.
GEOFENCE = ("at")
GEOFENCE = ("at",)
TITLE_RE = r"^https?:\/\/www\.joyn\.at\/(?:play\/)?(?P<type>filme|serien|compilation)\/(?P<content_id>.+)$"
AUTH_CODE_REGEX = r"[&?]code=([^&]+)"
@@ -34,12 +33,14 @@ class JOYNAT(Service):
@staticmethod
@click.command(name="JOYN", short_help="https://joyn.at", help=__doc__)
@click.argument("title", type=str)
@click.option("--age-bypass", is_flag=True, default=False, help="Download age gated videos with a rating of 16 years old or above.")
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> JOYNAT:
return JOYNAT(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str):
def __init__(self, ctx: click.Context, title: str, age_bypass: bool = False):
self.title = title
self.age_bypass = age_bypass
super().__init__(ctx)
@@ -74,10 +75,14 @@ class JOYNAT(Service):
else:
self._authenticate_with_cookies(cookies)
def generate_code_verifier(self):
@staticmethod
def _generate_code_verifier() -> str:
"""Generate a PKCE code verifier."""
return secrets.token_urlsafe(64)
def generate_code_challenge(self, verifier):
@staticmethod
def _generate_code_challenge(verifier: str) -> str:
"""Generate a PKCE code challenge from a verifier."""
sha256_hash = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
@@ -87,14 +92,11 @@ class JOYNAT(Service):
redirect_uri = self.config["endpoints"]["redirect_uri"]
client_id = self.config["client"]["id"]
code_verifier = self.generate_code_verifier()
code_challenge = self.generate_code_challenge(code_verifier)
code_verifier = self._generate_code_verifier()
code_challenge = self._generate_code_challenge(code_verifier)
redirect_url_request = self.session.get(
auth_url,
headers={
**self.session.headers,
},
params={
'response_type': 'code',
'scope': 'openid email profile offline_access',
@@ -114,17 +116,15 @@ class JOYNAT(Service):
# Find the auth_code using regex
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
if auth_code_match:
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
else:
self.log.error("Authorization code not found in redirect URL.")
raise EnvironmentError("Could not find authorization code.")
if not auth_code_match:
raise EnvironmentError("Authorization code not found in redirect URL.")
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/json'
},
cookies=cookies,
@@ -138,13 +138,12 @@ class JOYNAT(Service):
response.raise_for_status()
auth_response = response.json()
if 'access_token' in auth_response:
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
else:
self.log.error("No access_token found in response.")
if 'access_token' not in auth_response:
raise EnvironmentError("Cookie authentication failed: no access token in response.")
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
def _authenticate_anonymous(self) -> None:
token_url = self.config["endpoints"]["anon_auth_url"]
@@ -154,197 +153,237 @@ class JOYNAT(Service):
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/json'
'Content-Type': 'application/json',
},
json={
'client_id': anon_device_id,
'client_name': client_name,
'anon_device_id': anon_device_id,
},
json={'client_id': anon_device_id, 'client_name': client_name, 'anon_device_id': anon_device_id},
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' in auth_response:
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with Joyn service successfully.")
else:
self.log.error("No access_token found in response.")
if 'access_token' not in auth_response:
raise EnvironmentError("Anonymous authentication failed: no access token in response.")
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with Joyn service.")
# Required methods:
def _is_age_restricted(self, data: dict, title_name: str) -> bool:
"""Check if content is age-restricted and should be skipped.
Returns True if the content has an age rating >= 16 and --age-bypass
was not specified.
"""
age_rating = data.get('ageRating') or {}
min_age = age_rating.get('minAge', 0)
if min_age >= 16 and not self.age_bypass:
self.log.warning(f"Skipping '{title_name}' due to age rating ({min_age}+). Use --age-bypass to download.")
return True
return False
@staticmethod
def _validate_required_fields(data: dict, fields: list[str], context: str) -> None:
"""Raise ValueError if any required fields are missing from data."""
missing = [f for f in fields if f not in data]
if missing:
raise ValueError(f"Missing required fields {missing} in {context}.")
def _validate_video_field(self, data: dict, context: str) -> None:
"""Validate that a 'video.id' field exists in the data."""
if 'video' not in data or 'id' not in data['video']:
raise ValueError(f"Missing 'video.id' in {context}.")
# ------------------------------------------------------------------
# Required service methods
# ------------------------------------------------------------------
def get_titles(self) -> Union[Movies, Series]:
graphql_url = self.config["endpoints"]["graphql_url"]
api_key = self.config["client"]["api_key"]
client_name = self.config["client"]["name"]
match = re.match(self.TITLE_RE, self.title)
if not match:
raise ValueError(f"Invalid title URL format: {self.title}")
try:
match = re.match(self.TITLE_RE, self.title)
if not match:
self.log.error(f"Invalid title URL format: {self.title}")
raise ValueError("Invalid title URL format.")
kind = match.group("type")
content_id = match.group("content_id")
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
kind = match.group("type")
content_id = match.group("content_id")
if not kind or not content_id:
self.log.error(f"Invalid title URL: {self.title}. 'kind' or 'content_id' is missing.")
raise ValueError("Invalid title URL: 'kind' or 'content_id' is missing.")
raise ValueError(f"Invalid title URL: {self.title}. 'kind' or 'content_id' is missing.")
if kind == "filme":
path = f'/filme/{content_id}'
return self._get_movie_titles(content_id)
elif kind == "serien":
return self._get_series_titles(content_id)
response_data = self._execute_graphql_query('PageMovieDetailNewStatic', {'path': path}, '7b49493138f2162be230fd0e3fbf5722b1db6700a8842109ed3d98979898707a')
raise ValueError(f"Unsupported content type: '{kind}'.")
if 'page' in response_data and 'movie' in response_data['page']:
movie_data = response_data['page']['movie']
def _get_movie_titles(self, content_id: str) -> Movies:
"""Fetch and return movie title data."""
path = f'/filme/{content_id}'
response_data = self._execute_graphql_query(
'PageMovieDetailNewStatic', {'path': path},
'7b49493138f2162be230fd0e3fbf5722b1db6700a8842109ed3d98979898707a',
)
if 'id' not in movie_data or 'title' not in movie_data or 'productionYear' not in movie_data:
self.log.error("Invalid movie_data data received.")
raise ValueError("Invalid movie_data data received from Joyn service.")
movie_data = response_data.get('page', {}).get('movie')
if not movie_data:
raise ValueError("Failed to fetch movie data from Joyn service.")
if 'video' not in movie_data or 'id' not in movie_data['video']:
self.log.error("Invalid movie_data data received.")
raise ValueError("Invalid movie_data data received from Joyn service.")
self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data')
self._validate_video_field(movie_data, 'movie_data')
return Movies(
[
Movie(
id_=movie_data['id'],
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
]
)
if self._is_age_restricted(movie_data, movie_data['title']):
return Movies([])
if kind == "serien":
path = f'/serien/{content_id}'
if len(content_id.split("/")) == 1:
response_data = self._execute_graphql_query('SeriesDetailPageStatic', {'path': path}, '43cad327eeae12e14dfb629d662ebc947d78b71ec91d972ea1ef46ccdb29eede')
return Movies([
Movie(
id_=movie_data['id'],
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
])
if 'page' in response_data and 'series' in response_data['page']:
series_data = response_data['page']['series']
def _get_series_titles(self, content_id: str) -> Series:
"""Fetch and return series/episode title data."""
path = f'/serien/{content_id}'
parts = content_id.split("/")
if 'title' not in series_data or 'allSeasons' not in series_data:
self.log.error("Invalid series_data data received.")
raise ValueError("Invalid series_data data received from Joyn service.")
if len(parts) == 1:
return self._get_full_series(path)
elif len(parts) == 2:
return self._get_single_episode(path)
episodes = []
for season in series_data['allSeasons']:
if 'id' not in season or 'number' not in season:
self.log.error("Invalid series_data data received.")
raise ValueError("Invalid series_data data received from Joyn service.")
#
response_data = self._execute_graphql_query('Season', {"id": season["id"]}, 'ee2396bb1b7c9f800e5cefd0b341271b7213fceb4ebe18d5a30dab41d703009f')
raise ValueError(f"Unexpected series URL depth: '{content_id}'.")
if 'season' in response_data:
season_data = response_data['season']
def _get_full_series(self, path: str) -> Series:
"""Fetch all episodes across all seasons of a series."""
response_data = self._execute_graphql_query(
'SeriesDetailPageStatic', {'path': path},
'43cad327eeae12e14dfb629d662ebc947d78b71ec91d972ea1ef46ccdb29eede',
)
if 'episodes' not in season_data:
self.log.error("Invalid season_data data received.")
raise ValueError("Invalid season_data data received from Joyn service.")
series_data = response_data.get('page', {}).get('series')
if not series_data:
raise ValueError("Failed to fetch series data from Joyn service.")
for episode in season_data['episodes']:
if 'id' not in episode or 'title' not in episode or 'number' not in episode:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from Joyn service.")
self._validate_required_fields(series_data, ['title', 'allSeasons'], 'series_data')
if 'video' not in episode or 'id' not in episode['video']:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from Joyn service.")
episodes = []
for season in series_data['allSeasons']:
self._validate_required_fields(season, ['id', 'number'], 'season')
episodes.append(
Episode(
id_=episode['id'],
service=self.__class__,
title=series_data['title'],
season=season['number'],
number=episode['number'],
name=episode['title'],
data=episode,
)
)
season_response = self._execute_graphql_query(
'Season', {'id': season['id']},
'ee2396bb1b7c9f800e5cefd0b341271b7213fceb4ebe18d5a30dab41d703009f',
)
return Series(episodes)
season_data = season_response.get('season')
if not season_data:
continue
elif len(content_id.split("/")) == 2:
response_data = self._execute_graphql_query('EpisodeDetailPageStatic', {'path': path}, 'c4bcacee94d38133e87879dad8d69bd8a74c7326262a1848cceb964b871c1551')
self._validate_required_fields(season_data, ['episodes'], 'season_data')
if 'page' in response_data and 'episode' in response_data['page']:
episode_data = response_data['page']['episode']
for episode in season_data['episodes']:
self._validate_required_fields(episode, ['id', 'title', 'number'], 'episode')
self._validate_video_field(episode, 'episode')
if 'id' not in episode_data or 'title' not in episode_data or 'number' not in episode_data:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
if self._is_age_restricted(episode, episode['title']):
continue
if 'season' not in episode_data or 'number' not in episode_data['season']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
episodes.append(Episode(
id_=episode['id'],
service=self.__class__,
title=series_data['title'],
season=season['number'],
number=episode['number'],
name=episode['title'],
data=episode,
))
if 'series' not in episode_data or 'title' not in episode_data['series']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
return Series(episodes)
if 'video' not in episode_data or 'id' not in episode_data['video']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
def _get_single_episode(self, path: str) -> Series:
"""Fetch a single episode by its direct URL."""
response_data = self._execute_graphql_query(
'EpisodeDetailPageStatic', {'path': path},
'c4bcacee94d38133e87879dad8d69bd8a74c7326262a1848cceb964b871c1551',
)
return Series(
[
Episode(
id_=episode_data['id'],
service=self.__class__,
title=episode_data['series']['title'],
season=episode_data['season']['number'],
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
]
)
episode_data = response_data.get('page', {}).get('episode')
if not episode_data:
raise ValueError("Failed to fetch episode data from Joyn service.")
self.log.error(f"Failed to fetch Movie data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch Movie data from Joyn service.")
self._validate_required_fields(episode_data, ['id', 'title', 'number'], 'episode_data')
self._validate_required_fields(episode_data.get('season', {}), ['number'], 'episode_data.season')
self._validate_required_fields(episode_data.get('series', {}), ['title'], 'episode_data.series')
self._validate_video_field(episode_data, 'episode_data')
if self._is_age_restricted(episode_data, episode_data['title']):
return Series([])
return Series([
Episode(
id_=episode_data['id'],
service=self.__class__,
title=episode_data['series']['title'],
season=episode_data['season']['number'],
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
])
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
entitlement_url = self.config["endpoints"]["entitlement"]
playout_url = self.config["endpoints"]["playout"]
if not isinstance(title, Episode) and not isinstance(title, Movie):
self.log.error(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
raise ValueError(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
if not isinstance(title, (Episode, Movie)):
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
content_id = title.data['video']['id']
entitlement = self.session.post(
entitlement_url,
headers={
**self.session.headers,
'Authorization': f'Bearer {self._joyn_auth_jwt}'
},
json={'content_id': content_id, 'content_type': 'VOD'}
)
entitlement.raise_for_status()
entitlement_data = entitlement.json()
if 'entitlement_token' not in entitlement_data:
self.log.error(f"Failed to fetch tracks entitlement: 'entitlement_token' not in entitlement_data")
raise EnvironmentError("Failed to fetch tracks entitlement from Joyn service.")
# Step 1: Obtain entitlement token
entitlement_data = self._fetch_entitlement(content_id)
entitlement_token = entitlement_data['entitlement_token']
playlist = self.session.post(
playout_url.format(content_id=content_id),
# Step 2: Obtain playlist/manifest
playlist_data = self._fetch_playlist(content_id, entitlement_token)
manifest_url = playlist_data['manifestUrl']
license_url = playlist_data.get('licenseUrl')
# Step 3: Parse DASH manifest into tracks
all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
for track in all_tracks:
if track.data is None:
track.data = {}
track.data['license_url'] = license_url
return Tracks(all_tracks)
def _fetch_entitlement(self, content_id: str) -> dict:
"""Request an entitlement token for the given content."""
response = self.session.post(
self.config["endpoints"]["entitlement"],
headers={
**self.session.headers,
'Authorization': f'Bearer {entitlement_token}'
'Authorization': f'Bearer {self._joyn_auth_jwt}',
},
json={'content_id': content_id, 'content_type': 'VOD'},
)
response.raise_for_status()
data = response.json()
if 'entitlement_token' not in data:
raise EnvironmentError("Entitlement response missing 'entitlement_token'.")
return data
def _fetch_playlist(self, content_id: str, entitlement_token: str) -> dict:
"""Request the playout/manifest URL for the given content."""
response = self.session.post(
self.config["endpoints"]["playout"].format(content_id=content_id),
headers={
'Authorization': f'Bearer {entitlement_token}',
},
json={
'manufacturer': 'unknown',
@@ -357,31 +396,14 @@ class JOYNAT(Service):
'enableSubtitles': True,
'maxResolution': 1080,
'variantName': 'default',
}
},
)
playlist.raise_for_status()
response.raise_for_status()
playlist_data = playlist.json()
if not 'manifestUrl' in playlist_data:
self.log.error(f"Failed to fetch tracks playlist: 'manifestUrl' not in entitlement_data")
raise EnvironmentError("Failed to fetch tracks playlist from Joyn service.")
manifest_url = playlist_data['manifestUrl']
# Get license_url or set to None if not present
license_url = playlist_data.get('licenseUrl', None)
all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
# Attach license_url to each track's data dictionary
for tr in all_tracks:
if tr.data is None:
tr.data = {}
tr.data['license_url'] = license_url
# Return a new Tracks object containing all collected tracks
return Tracks(all_tracks)
data = response.json()
if 'manifestUrl' not in data:
raise EnvironmentError("Playlist response missing 'manifestUrl'.")
return data
def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]:
# technically optional, but you must define and at least `return []`.
@@ -390,39 +412,36 @@ class JOYNAT(Service):
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Optional[Union[bytes, str]]:
# Safely extract license_url from track.data
if hasattr(track, "data") and track.data:
license_url = track.data.get("license_url")
def get_widevine_license(
self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack,
) -> Optional[Union[bytes, str]]:
"""Obtain a Widevine license for the given track."""
license_url = track.data.get('license_url')
if not license_url:
raise ValueError("No license_url in track.data")
raise ValueError("No license_url in track.data.")
response = self.session.post(
license_url,
headers={
**self.session.headers,
'Content-Type': 'application/octet-stream',
'x-auth-token': self._joyn_auth_jwt
'x-auth-token': self._joyn_auth_jwt,
},
data=challenge
data=challenge,
)
if response.status_code != 200:
self.log.error(f"Failed to fetch license: {response.status_code} - {response.text}")
raise ConnectionError(response.text)
response.raise_for_status()
self.log.info("Successfully fetched Widevine license from Joyn service.")
self.log.info("Successfully fetched Widevine license.")
return response.content
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
**self.session.headers,
'X-Api-Key': self.config["client"]["api_key"]
},
params={
'operationName': operation_name,
'variables': json.dumps(variables).encode(),
'variables': json.dumps(variables),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}
}).encode()
@@ -432,6 +451,9 @@ class JOYNAT(Service):
response_data = response.json()
if response_data.get("errors"):
raise ValueError(f"GraphQL errors for '{operation_name}': {response_data['errors']}")
if 'data' not in response_data:
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")

View File

@@ -1,32 +1,31 @@
from __future__ import annotations
import base64
import hashlib
import json
import re
import secrets
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
import requests
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.manifests import DASH
from devine.core.constants import AnyTrack
from devine.core.service import Service
from devine.core.titles import Movies, Series
from devine.core.tracks import Chapter, Tracks
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks, Track
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Chapters, Tracks
import re
import json
import base64
import hashlib
import secrets
class JOYNDE(Service):
"""Joyn Germmany (joyn.de) streaming service."""
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = ()
# List of regions of which the service offers support for.
GEOFENCE = ("de")
GEOFENCE = ("de",)
TITLE_RE = r"^https?:\/\/www\.joyn\.de\/(?:play\/)?(?P<type>filme|serien|compilation)\/(?P<content_id>.+)$"
AUTH_CODE_REGEX = r"[&?]code=([^&]+)"
@@ -34,12 +33,14 @@ class JOYNDE(Service):
@staticmethod
@click.command(name="JOYN", short_help="https://joyn.de", help=__doc__)
@click.argument("title", type=str)
@click.option("--age-bypass", is_flag=True, default=False, help="Download age gated videos with a rating of 16 years old or above.")
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> JOYNDE:
return JOYNDE(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str):
def __init__(self, ctx: click.Context, title: str, age_bypass: bool = False):
self.title = title
self.age_bypass = age_bypass
super().__init__(ctx)
@@ -59,7 +60,7 @@ class JOYNDE(Service):
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0',
'Joyn-Platform': 'web',
'Joyn-Distribution-Tenant': 'JOYN_DE',
'Joyn-Country': 'DE',
'Joyn-Country': 'AT',
'Joyn-Client-Version': '5.1370.1',
})
@@ -74,10 +75,14 @@ class JOYNDE(Service):
else:
self._authenticate_with_cookies(cookies)
def generate_code_verifier(self):
@staticmethod
def _generate_code_verifier() -> str:
"""Generate a PKCE code verifier."""
return secrets.token_urlsafe(64)
def generate_code_challenge(self, verifier):
@staticmethod
def _generate_code_challenge(verifier: str) -> str:
"""Generate a PKCE code challenge from a verifier."""
sha256_hash = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
@@ -87,14 +92,11 @@ class JOYNDE(Service):
redirect_uri = self.config["endpoints"]["redirect_uri"]
client_id = self.config["client"]["id"]
code_verifier = self.generate_code_verifier()
code_challenge = self.generate_code_challenge(code_verifier)
code_verifier = self._generate_code_verifier()
code_challenge = self._generate_code_challenge(code_verifier)
redirect_url_request = self.session.get(
auth_url,
headers={
**self.session.headers,
},
params={
'response_type': 'code',
'scope': 'openid email profile offline_access',
@@ -114,17 +116,15 @@ class JOYNDE(Service):
# Find the auth_code using regex
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
if auth_code_match:
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
else:
self.log.error("Authorization code not found in redirect URL.")
raise EnvironmentError("Could not find authorization code.")
if not auth_code_match:
raise EnvironmentError("Authorization code not found in redirect URL.")
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/json'
},
cookies=cookies,
@@ -138,13 +138,12 @@ class JOYNDE(Service):
response.raise_for_status()
auth_response = response.json()
if 'access_token' in auth_response:
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
else:
self.log.error("No access_token found in response.")
if 'access_token' not in auth_response:
raise EnvironmentError("Cookie authentication failed: no access token in response.")
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
def _authenticate_anonymous(self) -> None:
token_url = self.config["endpoints"]["anon_auth_url"]
@@ -154,197 +153,237 @@ class JOYNDE(Service):
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/json'
'Content-Type': 'application/json',
},
json={
'client_id': anon_device_id,
'client_name': client_name,
'anon_device_id': anon_device_id,
},
json={'client_id': anon_device_id, 'client_name': client_name, 'anon_device_id': anon_device_id},
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' in auth_response:
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with Joyn service successfully.")
else:
self.log.error("No access_token found in response.")
if 'access_token' not in auth_response:
raise EnvironmentError("Anonymous authentication failed: no access token in response.")
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with Joyn service.")
# Required methods:
def _is_age_restricted(self, data: dict, title_name: str) -> bool:
"""Check if content is age-restricted and should be skipped.
Returns True if the content has an age rating >= 16 and --age-bypass
was not specified.
"""
age_rating = data.get('ageRating') or {}
min_age = age_rating.get('minAge', 0)
if min_age >= 16 and not self.age_bypass:
self.log.warning(f"Skipping '{title_name}' due to age rating ({min_age}+). Use --age-bypass to download.")
return True
return False
@staticmethod
def _validate_required_fields(data: dict, fields: list[str], context: str) -> None:
"""Raise ValueError if any required fields are missing from data."""
missing = [f for f in fields if f not in data]
if missing:
raise ValueError(f"Missing required fields {missing} in {context}.")
def _validate_video_field(self, data: dict, context: str) -> None:
"""Validate that a 'video.id' field exists in the data."""
if 'video' not in data or 'id' not in data['video']:
raise ValueError(f"Missing 'video.id' in {context}.")
# ------------------------------------------------------------------
# Required service methods
# ------------------------------------------------------------------
def get_titles(self) -> Union[Movies, Series]:
graphql_url = self.config["endpoints"]["graphql_url"]
api_key = self.config["client"]["api_key"]
client_name = self.config["client"]["name"]
match = re.match(self.TITLE_RE, self.title)
if not match:
raise ValueError(f"Invalid title URL format: {self.title}")
try:
match = re.match(self.TITLE_RE, self.title)
if not match:
self.log.error(f"Invalid title URL format: {self.title}")
raise ValueError("Invalid title URL format.")
kind = match.group("type")
content_id = match.group("content_id")
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
kind = match.group("type")
content_id = match.group("content_id")
if not kind or not content_id:
self.log.error(f"Invalid title URL: {self.title}. 'kind' or 'content_id' is missing.")
raise ValueError("Invalid title URL: 'kind' or 'content_id' is missing.")
raise ValueError(f"Invalid title URL: {self.title}. 'kind' or 'content_id' is missing.")
if kind == "filme":
path = f'/filme/{content_id}'
return self._get_movie_titles(content_id)
elif kind == "serien":
return self._get_series_titles(content_id)
response_data = self._execute_graphql_query('PageMovieDetailNewStatic', {'path': path}, '7b49493138f2162be230fd0e3fbf5722b1db6700a8842109ed3d98979898707a')
raise ValueError(f"Unsupported content type: '{kind}'.")
if 'page' in response_data and 'movie' in response_data['page']:
movie_data = response_data['page']['movie']
def _get_movie_titles(self, content_id: str) -> Movies:
"""Fetch and return movie title data."""
path = f'/filme/{content_id}'
response_data = self._execute_graphql_query(
'PageMovieDetailNewStatic', {'path': path},
'7b49493138f2162be230fd0e3fbf5722b1db6700a8842109ed3d98979898707a',
)
if 'id' not in movie_data or 'title' not in movie_data or 'productionYear' not in movie_data:
self.log.error("Invalid movie_data data received.")
raise ValueError("Invalid movie_data data received from Joyn service.")
movie_data = response_data.get('page', {}).get('movie')
if not movie_data:
raise ValueError("Failed to fetch movie data from Joyn service.")
if 'video' not in movie_data or 'id' not in movie_data['video']:
self.log.error("Invalid movie_data data received.")
raise ValueError("Invalid movie_data data received from Joyn service.")
self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data')
self._validate_video_field(movie_data, 'movie_data')
return Movies(
[
Movie(
id_=movie_data['id'],
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
]
)
if self._is_age_restricted(movie_data, movie_data['title']):
return Movies([])
if kind == "serien":
path = f'/serien/{content_id}'
if len(content_id.split("/")) == 1:
response_data = self._execute_graphql_query('SeriesDetailPageStatic', {'path': path}, '43cad327eeae12e14dfb629d662ebc947d78b71ec91d972ea1ef46ccdb29eede')
return Movies([
Movie(
id_=movie_data['id'],
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
])
if 'page' in response_data and 'series' in response_data['page']:
series_data = response_data['page']['series']
def _get_series_titles(self, content_id: str) -> Series:
"""Fetch and return series/episode title data."""
path = f'/serien/{content_id}'
parts = content_id.split("/")
if 'title' not in series_data or 'allSeasons' not in series_data:
self.log.error("Invalid series_data data received.")
raise ValueError("Invalid series_data data received from Joyn service.")
if len(parts) == 1:
return self._get_full_series(path)
elif len(parts) == 2:
return self._get_single_episode(path)
episodes = []
for season in series_data['allSeasons']:
if 'id' not in season or 'number' not in season:
self.log.error("Invalid series_data data received.")
raise ValueError("Invalid series_data data received from Joyn service.")
#
response_data = self._execute_graphql_query('Season', {"id": season["id"]}, 'ee2396bb1b7c9f800e5cefd0b341271b7213fceb4ebe18d5a30dab41d703009f')
raise ValueError(f"Unexpected series URL depth: '{content_id}'.")
if 'season' in response_data:
season_data = response_data['season']
def _get_full_series(self, path: str) -> Series:
"""Fetch all episodes across all seasons of a series."""
response_data = self._execute_graphql_query(
'SeriesDetailPageStatic', {'path': path},
'43cad327eeae12e14dfb629d662ebc947d78b71ec91d972ea1ef46ccdb29eede',
)
if 'episodes' not in season_data:
self.log.error("Invalid season_data data received.")
raise ValueError("Invalid season_data data received from Joyn service.")
series_data = response_data.get('page', {}).get('series')
if not series_data:
raise ValueError("Failed to fetch series data from Joyn service.")
for episode in season_data['episodes']:
if 'id' not in episode or 'title' not in episode or 'number' not in episode:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from Joyn service.")
self._validate_required_fields(series_data, ['title', 'allSeasons'], 'series_data')
if 'video' not in episode or 'id' not in episode['video']:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from Joyn service.")
episodes = []
for season in series_data['allSeasons']:
self._validate_required_fields(season, ['id', 'number'], 'season')
episodes.append(
Episode(
id_=episode['id'],
service=self.__class__,
title=series_data['title'],
season=season['number'],
number=episode['number'],
name=episode['title'],
data=episode,
)
)
season_response = self._execute_graphql_query(
'Season', {'id': season['id']},
'ee2396bb1b7c9f800e5cefd0b341271b7213fceb4ebe18d5a30dab41d703009f',
)
return Series(episodes)
season_data = season_response.get('season')
if not season_data:
continue
elif len(content_id.split("/")) == 2:
response_data = self._execute_graphql_query('EpisodeDetailPageStatic', {'path': path}, 'c4bcacee94d38133e87879dad8d69bd8a74c7326262a1848cceb964b871c1551')
self._validate_required_fields(season_data, ['episodes'], 'season_data')
if 'page' in response_data and 'episode' in response_data['page']:
episode_data = response_data['page']['episode']
for episode in season_data['episodes']:
self._validate_required_fields(episode, ['id', 'title', 'number'], 'episode')
self._validate_video_field(episode, 'episode')
if 'id' not in episode_data or 'title' not in episode_data or 'number' not in episode_data:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
if self._is_age_restricted(episode, episode['title']):
continue
if 'season' not in episode_data or 'number' not in episode_data['season']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
episodes.append(Episode(
id_=episode['id'],
service=self.__class__,
title=series_data['title'],
season=season['number'],
number=episode['number'],
name=episode['title'],
data=episode,
))
if 'series' not in episode_data or 'title' not in episode_data['series']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
return Series(episodes)
if 'video' not in episode_data or 'id' not in episode_data['video']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
def _get_single_episode(self, path: str) -> Series:
"""Fetch a single episode by its direct URL."""
response_data = self._execute_graphql_query(
'EpisodeDetailPageStatic', {'path': path},
'c4bcacee94d38133e87879dad8d69bd8a74c7326262a1848cceb964b871c1551',
)
return Series(
[
Episode(
id_=episode_data['id'],
service=self.__class__,
title=episode_data['series']['title'],
season=episode_data['season']['number'],
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
]
)
episode_data = response_data.get('page', {}).get('episode')
if not episode_data:
raise ValueError("Failed to fetch episode data from Joyn service.")
self.log.error(f"Failed to fetch Movie data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch Movie data from Joyn service.")
self._validate_required_fields(episode_data, ['id', 'title', 'number'], 'episode_data')
self._validate_required_fields(episode_data.get('season', {}), ['number'], 'episode_data.season')
self._validate_required_fields(episode_data.get('series', {}), ['title'], 'episode_data.series')
self._validate_video_field(episode_data, 'episode_data')
if self._is_age_restricted(episode_data, episode_data['title']):
return Series([])
return Series([
Episode(
id_=episode_data['id'],
service=self.__class__,
title=episode_data['series']['title'],
season=episode_data['season']['number'],
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
])
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
entitlement_url = self.config["endpoints"]["entitlement"]
playout_url = self.config["endpoints"]["playout"]
if not isinstance(title, Episode) and not isinstance(title, Movie):
self.log.error(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
raise ValueError(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
if not isinstance(title, (Episode, Movie)):
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
content_id = title.data['video']['id']
entitlement = self.session.post(
entitlement_url,
headers={
**self.session.headers,
'Authorization': f'Bearer {self._joyn_auth_jwt}'
},
json={'content_id': content_id, 'content_type': 'VOD'}
)
entitlement.raise_for_status()
entitlement_data = entitlement.json()
if 'entitlement_token' not in entitlement_data:
self.log.error(f"Failed to fetch tracks entitlement: 'entitlement_token' not in entitlement_data")
raise EnvironmentError("Failed to fetch tracks entitlement from Joyn service.")
# Step 1: Obtain entitlement token
entitlement_data = self._fetch_entitlement(content_id)
entitlement_token = entitlement_data['entitlement_token']
playlist = self.session.post(
playout_url.format(content_id=content_id),
# Step 2: Obtain playlist/manifest
playlist_data = self._fetch_playlist(content_id, entitlement_token)
manifest_url = playlist_data['manifestUrl']
license_url = playlist_data.get('licenseUrl')
# Step 3: Parse DASH manifest into tracks
all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
for track in all_tracks:
if track.data is None:
track.data = {}
track.data['license_url'] = license_url
return Tracks(all_tracks)
def _fetch_entitlement(self, content_id: str) -> dict:
"""Request an entitlement token for the given content."""
response = self.session.post(
self.config["endpoints"]["entitlement"],
headers={
**self.session.headers,
'Authorization': f'Bearer {entitlement_token}'
'Authorization': f'Bearer {self._joyn_auth_jwt}',
},
json={'content_id': content_id, 'content_type': 'VOD'},
)
response.raise_for_status()
data = response.json()
if 'entitlement_token' not in data:
raise EnvironmentError("Entitlement response missing 'entitlement_token'.")
return data
def _fetch_playlist(self, content_id: str, entitlement_token: str) -> dict:
"""Request the playout/manifest URL for the given content."""
response = self.session.post(
self.config["endpoints"]["playout"].format(content_id=content_id),
headers={
'Authorization': f'Bearer {entitlement_token}',
},
json={
'manufacturer': 'unknown',
@@ -357,31 +396,14 @@ class JOYNDE(Service):
'enableSubtitles': True,
'maxResolution': 1080,
'variantName': 'default',
}
},
)
playlist.raise_for_status()
response.raise_for_status()
playlist_data = playlist.json()
if not 'manifestUrl' in playlist_data:
self.log.error(f"Failed to fetch tracks playlist: 'manifestUrl' not in entitlement_data")
raise EnvironmentError("Failed to fetch tracks playlist from Joyn service.")
manifest_url = playlist_data['manifestUrl']
# Get license_url or set to None if not present
license_url = playlist_data.get('licenseUrl', None)
all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
# Attach license_url to each track's data dictionary
for tr in all_tracks:
if tr.data is None:
tr.data = {}
tr.data['license_url'] = license_url
# Return a new Tracks object containing all collected tracks
return Tracks(all_tracks)
data = response.json()
if 'manifestUrl' not in data:
raise EnvironmentError("Playlist response missing 'manifestUrl'.")
return data
def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]:
# technically optional, but you must define and at least `return []`.
@@ -390,39 +412,36 @@ class JOYNDE(Service):
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Optional[Union[bytes, str]]:
# Safely extract license_url from track.data
if hasattr(track, "data") and track.data:
license_url = track.data.get("license_url")
def get_widevine_license(
self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack,
) -> Optional[Union[bytes, str]]:
"""Obtain a Widevine license for the given track."""
license_url = track.data.get('license_url')
if not license_url:
raise ValueError("No license_url in track.data")
raise ValueError("No license_url in track.data.")
response = self.session.post(
license_url,
headers={
**self.session.headers,
'Content-Type': 'application/octet-stream',
'x-auth-token': self._joyn_auth_jwt
'x-auth-token': self._joyn_auth_jwt,
},
data=challenge
data=challenge,
)
if response.status_code != 200:
self.log.error(f"Failed to fetch license: {response.status_code} - {response.text}")
raise ConnectionError(response.text)
response.raise_for_status()
self.log.info("Successfully fetched Widevine license from Joyn service.")
self.log.info("Successfully fetched Widevine license.")
return response.content
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
**self.session.headers,
'X-Api-Key': self.config["client"]["api_key"]
},
params={
'operationName': operation_name,
'variables': json.dumps(variables).encode(),
'variables': json.dumps(variables),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}
}).encode()
@@ -432,6 +451,9 @@ class JOYNDE(Service):
response_data = response.json()
if response_data.get("errors"):
raise ValueError(f"GraphQL errors for '{operation_name}': {response_data['errors']}")
if 'data' not in response_data:
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")

View File

@@ -1,30 +1,32 @@
from __future__ import annotations
import base64
import hashlib
import json
import re
import secrets
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
import requests
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.manifests import DASH
from devine.core.constants import AnyTrack
from devine.core.service import Service
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks, Track
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapters, Tracks
import re
import json
import base64
import hashlib
import secrets
class RTLP(Service):
"""RTL+ (plus.rtl.de) streaming service."""
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = ()
# List of regions of which the service offers support for.
GEOFENCE = ("de", "at")
GEOFENCE = ("de", "at", "ch")
TITLE_RE = r"^https?:\/\/plus\.rtl\.de\/video-tv\/(?P<kind>shows|serien|filme)\/(?:[^\/]+-)?(?P<show_id>\d+)(?:\/[^\/]+-)?(?P<season_id>\d+)?(?:\/[^\/]+-)?(?P<episode_id>\d+)?$"
AUTH_CODE_REGEX = r"code=([\w-]+\.[\w-]+\.[\w-]+)"
@@ -32,23 +34,19 @@ class RTLP(Service):
@staticmethod
@click.command(name="RTLP", short_help="https://plus.rtl.de", help=__doc__)
@click.argument("title", type=str)
@click.option("--premium-bypass", is_flag=True, default=False, help="Download premium-only content (default: skip premium content).")
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> RTLP:
return RTLP(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str):
def __init__(self, ctx: click.Context, title: str, premium_bypass: bool = False):
self.title = title
self.premium_bypass = premium_bypass
super().__init__(ctx)
def get_session(self) -> requests.Session:
# modify the creation of the requests session (stored as self.session)
# make a super() call to take the original result and further modify it,
# or don't to make a completely fresh one if required.
session = super().get_session()
# Set default headers as specified
session.headers.update({
'Accept': '*/*',
'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
@@ -56,7 +54,7 @@ class RTLP(Service):
'Referer': 'https://plus.rtl.de/',
'Rtlplus-Client-Id': 'rci:rtlplus:web',
'Rtlplus-Client-Version': '2024.7.29.2',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0',
})
return session
@@ -66,14 +64,21 @@ class RTLP(Service):
if not cookies:
self._authenticate_anonymous()
else:
self._authenticate_with_cookies(cookies)
def generate_code_verifier(self):
# ------------------------------------------------------------------
# Authentication helpers
# ------------------------------------------------------------------
@staticmethod
def _generate_code_verifier() -> str:
"""Generate a PKCE code verifier."""
return secrets.token_urlsafe(64)
def generate_code_challenge(self, verifier):
@staticmethod
def _generate_code_challenge(verifier: str) -> str:
"""Generate a PKCE code challenge from a verifier."""
sha256_hash = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
@@ -81,21 +86,18 @@ class RTLP(Service):
auth_url = self.config["endpoints"]["auth_url"]
token_url = self.config["endpoints"]["token_url"]
code_verifier = self.generate_code_verifier()
code_challenge = self.generate_code_challenge(code_verifier)
code_verifier = self._generate_code_verifier()
code_challenge = self._generate_code_challenge(code_verifier)
redirect_url_request = self.session.get(
auth_url,
headers={
**self.session.headers,
},
params={
'client_id': 'rtlplus-web',
'redirect_uri': 'https://plus.rtl.de/silent-check-sso.html',
'response_type': 'code',
'scope': 'openid',
'code_challenge_method': 'S256',
'code_challenge': code_challenge
'code_challenge': code_challenge,
},
cookies=cookies,
)
@@ -104,355 +106,385 @@ class RTLP(Service):
redirect_url = redirect_url_request.url
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
if auth_code_match:
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
else:
self.log.error("Authorization code not found in redirect URL.")
raise EnvironmentError("Could not find authorization code.")
if not auth_code_match:
raise RuntimeError("Authorization code not found in redirect URL.")
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/x-www-form-urlencoded'
'Content-Type': 'application/x-www-form-urlencoded',
},
cookies=cookies,
data=bytes(f'grant_type=authorization_code&client_id=rtlplus-web&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html&code={auth_code}&code_verifier={code_verifier}', 'utf-8'),
data=bytes(
f'grant_type=authorization_code&client_id=rtlplus-web'
f'&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html'
f'&code={auth_code}&code_verifier={code_verifier}',
'utf-8',
),
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' in auth_response:
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
else:
self.log.error("No access_token found in response.")
raise EnvironmentError("Cookie authentication failed: no access token in response.")
if 'access_token' not in auth_response:
raise RuntimeError("Cookie authentication failed: no access token in response.")
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
def _authenticate_anonymous(self) -> None:
token_url = self.config["endpoints"]["token_url"]
client_secret = self.config["client"]["secret"]
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/x-www-form-urlencoded'
'Content-Type': 'application/x-www-form-urlencoded',
},
data=bytes('grant_type=client_credentials&client_id=anonymous-user&client_secret=4bfeb73f-1c4a-4e9f-a7fa-96aa1ad3d94c', 'utf-8'),
data=bytes(
f'grant_type=client_credentials&client_id={self.config["client"]["id"]}'
f'&client_secret={client_secret}',
'utf-8',
),
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' not in auth_response:
raise RuntimeError("Anonymous authentication failed: no access token in response.")
if 'access_token' in auth_response:
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with RTL+ service successfully.")
else:
self.log.error("No access_token found in response.")
raise EnvironmentError("Anonymous authentication failed: no access token in response.")
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with RTL+ service.")
# Required methods:
# ------------------------------------------------------------------
# Validation helpers
# ------------------------------------------------------------------
@staticmethod
def _validate_required_fields(data: dict, fields: list[str], context: str) -> None:
"""Raise ValueError if any required fields are missing from data."""
missing = [f for f in fields if f not in data]
if missing:
raise ValueError(f"Missing required fields {missing} in {context}.")
# ------------------------------------------------------------------
# Required service methods
# ------------------------------------------------------------------
def get_titles(self) -> Union[Movies, Series]:
graphql_url = self.config["endpoints"]["graphql_url"]
match = re.match(self.TITLE_RE, self.title)
if not match:
raise ValueError(f"Could not parse ID from title — is the URL correct? ({self.title})")
try:
kind, show_id, season_id, episode_id = (
re.match(self.TITLE_RE, self.title).group(i) for i in ("kind", "show_id", "season_id", "episode_id")
)
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
kind = match.group("kind")
show_id = match.group("show_id")
season_id = match.group("season_id")
episode_id = match.group("episode_id")
if not kind or not show_id:
self.log.error(f"Invalid title URL: {self.title}. 'kind' or 'show_id' is missing.")
raise ValueError("Invalid title URL: 'kind' or 'show_id' is missing.")
raise ValueError(f"Invalid title URL: 'kind' or 'show_id' is missing. ({self.title})")
if kind == "filme":
content_id = f'rrn:watch:videohub:movie:{show_id}'
return self._get_movie_titles(show_id)
elif kind in ("shows", "serien"):
return self._get_series_titles(show_id, season_id, episode_id)
response_data = self._execute_graphql_query('MovieDetail', {'id': content_id}, 'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4')
raise ValueError(f"Unsupported content type: '{kind}'.")
if 'movie' in response_data:
movie_data = response_data['movie']
def _get_movie_titles(self, show_id: str) -> Movies:
"""Fetch and return movie title data."""
content_id = f'rrn:watch:videohub:movie:{show_id}'
if 'id' not in movie_data or 'title' not in movie_data or 'productionYear' not in movie_data:
self.log.error("Invalid movie_data data received.")
raise ValueError("Invalid movie_data data received from RTL+ service.")
response_data = self._execute_graphql_query(
'MovieDetail', {'id': content_id},
'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4',
)
self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}")
movie_data = response_data.get('movie')
if not movie_data:
raise ValueError("Failed to fetch movie data from RTL+ service.")
return Movies(
[
Movie(
id_=content_id,
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
]
)
self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data')
self.log.error(f"Failed to fetch Movie data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch Movie data from RTL+ service.")
self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}")
if kind == "shows" or kind == "serien":
if episode_id:
content_id = f'rrn:watch:videohub:episode:{episode_id}'
if self._is_premium(movie_data) and not self.premium_bypass:
self.log.info(f"Skipping premium movie: {movie_data['title']} (use --premium-bypass to download)")
return Movies([])
response_data = self._execute_graphql_query('EpisodeDetail', {'episodeId': content_id}, '2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a')
return Movies([
Movie(
id_=content_id,
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
])
if 'episode' in response_data:
episode_data = response_data['episode']
def _get_series_titles(self, show_id: str, season_id: Optional[str], episode_id: Optional[str]) -> Series:
"""Route to the appropriate series fetch method based on URL specificity."""
if episode_id:
return self._get_single_episode(episode_id)
elif season_id:
return self._get_single_season(season_id)
elif show_id:
return self._get_full_show(show_id)
if 'id' not in episode_data or 'title' not in episode_data or 'number' not in episode_data or 'episodeSeason' not in episode_data:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from RTL+ service.")
raise ValueError("No valid identifier found in series URL.")
if 'format' not in episode_data and 'title' not in episode_data['format']:
self.log.error("Invalid episode format received.")
raise ValueError("Invalid episode format received from RTL+ service.")
def _get_single_episode(self, episode_id: str) -> Series:
"""Fetch a single episode by its ID."""
content_id = f'rrn:watch:videohub:episode:{episode_id}'
return Series(
[
Episode(
id_=content_id,
service=self.__class__,
title=episode_data['format']['title'],
season=self.get_episode_session(episode_data),
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
]
)
elif season_id:
content_id = f'rrn:watch:videohub:season:{season_id}'
response_data = self._execute_graphql_query(
'EpisodeDetail', {'episodeId': content_id},
'2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a',
)
response_data = self._execute_graphql_query('SeasonWithFormatAndEpisodes', {'seasonId': content_id}, 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584')
episode_data = response_data.get('episode')
if not episode_data:
raise ValueError("Failed to fetch episode data from RTL+ service.")
if 'season' in response_data:
season_data = response_data['season']
self._validate_required_fields(episode_data, ['id', 'title', 'number', 'episodeSeason'], 'episode_data')
if 'format' not in season_data or 'title' not in season_data['format']:
self.log.error("Invalid season format received.")
raise ValueError("Invalid season format received from RTL+ service.")
if 'format' not in episode_data or 'title' not in episode_data['format']:
raise ValueError("Missing 'format.title' in episode_data.")
if not 'episodes' in season_data or not isinstance(season_data['episodes'], list):
self.log.error("Invalid season data received.")
raise ValueError("Invalid season data received from RTL+ service.")
if self._is_premium(episode_data) and not self.premium_bypass:
self.log.info(f"Skipping premium episode: {episode_data['title']} (use --premium-bypass to download)")
return Series([])
episodes = []
for episode in season_data['episodes']:
if 'id' not in episode or 'title' not in episode or 'number' not in episode or 'episodeSeason' not in episode:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from RTL+ service.")
return Series([
Episode(
id_=content_id,
service=self.__class__,
title=episode_data['format']['title'],
season=self._get_episode_season(episode_data),
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
])
episodes.append(
Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self.get_episode_session(episode),
number=episode['number'],
name=episode['title'],
data=episode,
)
)
def _get_single_season(self, season_id: str) -> Series:
"""Fetch all episodes from a single season."""
content_id = f'rrn:watch:videohub:season:{season_id}'
return Series(
episodes
)
response_data = self._execute_graphql_query(
'SeasonWithFormatAndEpisodes', {'seasonId': content_id},
'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584',
)
elif show_id:
content_id = f'rrn:watch:videohub:format:{show_id}'
season_data = response_data.get('season')
if not season_data:
raise ValueError("Failed to fetch season data from RTL+ service.")
response_data = self._execute_graphql_query('Format', {'id': content_id}, 'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa')
if 'format' not in season_data or 'title' not in season_data['format']:
raise ValueError("Missing 'format.title' in season_data.")
if 'format' in response_data:
format_data = response_data['format']
if 'episodes' not in season_data or not isinstance(season_data['episodes'], list):
raise ValueError("Missing or invalid 'episodes' in season_data.")
if 'title' not in format_data or 'id' not in format_data:
self.log.error("Invalid format data received.")
raise ValueError("Invalid format data received from RTL+ service.")
episodes = []
for episode in season_data['episodes']:
self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode')
if 'seasons' not in format_data or not isinstance(format_data['seasons'], list):
self.log.error("Invalid format seasons data received.")
raise ValueError("Invalid format seasons data received from RTL+ service.")
if self._is_premium(episode) and not self.premium_bypass:
self.log.info(f"Skipping premium episode: {episode['title']} (use --premium-bypass to download)")
continue
episodes = []
for season in format_data['seasons']:
if not 'id' in season or not 'seasonType' in season:
self.log.error("Invalid season data received.")
raise ValueError("Invalid season data received from RTL+ service.")
episodes.append(Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self._get_episode_season(episode),
number=episode['number'],
name=episode['title'],
data=episode,
))
season_id = season['id']
return Series(episodes)
response_data = self._execute_graphql_query('SeasonWithFormatAndEpisodes', {'seasonId': season_id}, 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584')
def _get_full_show(self, show_id: str) -> Series:
"""Fetch all episodes across all seasons of a show/format."""
content_id = f'rrn:watch:videohub:format:{show_id}'
if 'season' in response_data:
season_data = response_data['season']
response_data = self._execute_graphql_query(
'Format', {'id': content_id},
'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa',
)
if 'format' not in season_data or 'title' not in season_data['format']:
self.log.error("Invalid season format received.")
raise ValueError("Invalid season format received from RTL+ service.")
format_data = response_data.get('format')
if not format_data:
raise ValueError("Failed to fetch format data from RTL+ service.")
if not 'episodes' in season_data or not isinstance(season_data['episodes'], list):
self.log.error("Invalid season data received.")
raise ValueError("Invalid season data received from RTL+ service.")
self._validate_required_fields(format_data, ['id', 'title', 'seasons'], 'format_data')
for episode in season_data['episodes']:
if 'id' not in episode or 'title' not in episode or 'number' not in episode or 'episodeSeason' not in episode:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from RTL+ service.")
if not isinstance(format_data['seasons'], list):
raise ValueError("Invalid 'seasons' in format_data (expected list).")
episodes.append(
Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self.get_episode_session(episode),
number=episode['number'],
name=episode['title'],
data=episode,
)
)
episodes = []
for season in format_data['seasons']:
self._validate_required_fields(season, ['id', 'seasonType'], 'season')
return Series(
episodes
)
season_response = self._execute_graphql_query(
'SeasonWithFormatAndEpisodes', {'seasonId': season['id']},
'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584',
)
self.log.error(f"Failed to fetch series data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch series data from RTL+ service.")
season_data = season_response.get('season')
if not season_data:
continue
if 'format' not in season_data or 'title' not in season_data['format']:
raise ValueError("Missing 'format.title' in season_data.")
if 'episodes' not in season_data or not isinstance(season_data['episodes'], list):
raise ValueError("Missing or invalid 'episodes' in season_data.")
for episode in season_data['episodes']:
self._validate_required_fields(episode, ['id', 'title', 'number', 'episodeSeason'], 'episode')
if self._is_premium(episode) and not self.premium_bypass:
self.log.info(f"Skipping premium episode: {episode['title']} (use --premium-bypass to download)")
continue
episodes.append(Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self._get_episode_season(episode),
number=episode['number'],
name=episode['title'],
data=episode,
))
return Series(episodes)
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
playout_url = self.config["endpoints"]["playout"]
if not isinstance(title, (Episode, Movie)):
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
if isinstance(title, Episode) or isinstance(title, Movie):
playout_url = playout_url.format(id=title.data['id'])
else:
self.log.error(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
raise ValueError(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
playout_url = self.config["endpoints"]["playout"].format(id=title.data['id'])
response = self.session.get(
playout_url,
headers={
**self.session.headers,
'x-auth-token': self._rtlp_auth_jwt
}
'x-auth-token': self._rtlp_auth_jwt,
},
)
response.raise_for_status()
if response and response.status_code == 200:
response_data = response.json()
response_data = response.json()
all_parsed_tracks = [] # Use a list to collect all individual track objects
all_parsed_tracks = []
for variant in response_data:
if 'name' not in variant:
raise ValueError("Invalid playout variant data: missing 'name'.")
for variant in response_data:
if 'name' not in variant:
self.log.error("Invalid playout variant data received.")
raise ValueError("Invalid playout variant data received from RTL+ service.")
if variant['name'] != 'dashhd':
continue
# Assuming 'dashsd' and 'dashhd' variants contain the MPD URLs
if variant['name'] == 'dashhd':
if 'sources' not in variant and len(variant['sources']) == 0:
self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.")
continue
if 'sources' not in variant or len(variant['sources']) == 0:
self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.")
continue
source_entry = variant['sources'][0]
source_entry = variant['sources'][0]
# Assuming the 'url' key in each source_entry is the DASH manifest URL
if not 'url' in source_entry:
self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.")
continue
if 'url' not in source_entry:
self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.")
continue
manifest_url = source_entry['url']
manifest_url = source_entry['url']
try:
all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de") # Use title's language for filtering/tagging
except Exception as e:
self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}")
# Decide if you want to raise or just log and continue for other manifests
continue
try:
all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
except Exception as e:
self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}")
continue
# Return a new Tracks object containing all collected tracks
return Tracks(all_parsed_tracks)
return Tracks(all_parsed_tracks)
else:
self.log.error(f"Failed to fetch tracks data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch tracks data from RTL+ service.")
def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]:
# technically optional, but you must define and at least `return []`.
def get_chapters(self, title: Union[Movies, Series]) -> list:
return Chapters()
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Optional[Union[bytes, str]]:
def get_widevine_license(
self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack,
) -> Optional[Union[bytes, str]]:
"""Obtain a Widevine license for the given track."""
license_url = self.config["endpoints"]["license"]
response = self.session.post(
license_url,
headers={
**self.session.headers,
'Content-Type': 'application/octet-stream',
'x-auth-token': self._rtlp_auth_jwt
'x-auth-token': self._rtlp_auth_jwt,
},
data=challenge
data=challenge,
)
response.raise_for_status()
if response.status_code != 200:
self.log.error(f"Failed to fetch license: {response.status_code} - {response.text}")
raise ConnectionError(response.text)
self.log.info("Successfully fetched Widevine license from RTL+ service.")
self.log.info("Successfully fetched Widevine license.")
return response.content
# ------------------------------------------------------------------
# GraphQL & utility helpers
# ------------------------------------------------------------------
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
"""Execute a persisted GraphQL query and return the 'data' payload."""
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
**self.session.headers,
'Authorization': f'Bearer {self._rtlp_auth_jwt}'
'Authorization': f'Bearer {self._rtlp_auth_jwt}',
},
params={
'operationName': operation_name,
'variables': json.dumps(variables).encode(),
'variables': json.dumps(variables),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}
}).encode()
}
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash},
}).encode(),
},
)
response.raise_for_status()
response_data = response.json()
if response_data.get("errors"):
raise ValueError(f"GraphQL errors for '{operation_name}': {response_data['errors']}")
if 'data' not in response_data:
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")
return response_data['data']
def get_episode_session(self, episode) -> int:
season_value = None
if 'seasonType' not in episode['episodeSeason']:
self.log.error("Invalid episode season received.")
raise ValueError("Invalid episode season received from RTL+ service.")
def _is_premium(self, data: dict) -> bool:
"""Check if content is premium-only (skipped unless premium_bypass is enabled)."""
return data.get('tier') == 'PREMIUM'
seasonType = episode['episodeSeason']['seasonType']
@staticmethod
def _get_episode_season(episode: dict) -> Optional[int]:
"""Extract the season number (ordinal or year) from episode data."""
episode_season = episode.get('episodeSeason', {})
season_type = episode_season.get('seasonType')
if seasonType == 'ANNUAL':
if 'season' in episode['episodeSeason'] and 'year' in episode['episodeSeason']['season']:
season_value = int(episode['episodeSeason']['season']['year'])
elif seasonType == 'ORDINAL':
if 'season' in episode['episodeSeason'] and 'ordinal' in episode['episodeSeason']['season']:
season_value = int(episode['episodeSeason']['season']['ordinal'])
else:
self.log.error(f"Unknown season type '{seasonType}' received.")
raise ValueError(f"Unknown season type '{seasonType}' received from RTL+ service.")
if not season_type:
raise ValueError("Missing 'seasonType' in episode season data.")
return season_value
season_info = episode_season.get('season', {})
if season_type == 'ANNUAL':
year = season_info.get('year')
return int(year) if year is not None else None
elif season_type == 'ORDINAL':
ordinal = season_info.get('ordinal')
return int(ordinal) if ordinal is not None else None
raise ValueError(f"Unknown season type '{season_type}'.")

View File

@@ -10,4 +10,5 @@ endpoints:
license: "https://rtlplus-widevine.streamingtech.de/index/rtlplus"
client:
id: "2a970b6d-adf2-4cf6-833f-9d940c300d09"
id: "2a970b6d-adf2-4cf6-833f-9d940c300d09"
secret: "4bfeb73f-1c4a-4e9f-a7fa-96aa1ad3d94c"