Added project source files

This commit is contained in:
2026-01-11 19:39:09 +01:00
commit 0a49e79cf1
6 changed files with 1103 additions and 0 deletions

170
.gitignore vendored Normal file
View File

@@ -0,0 +1,170 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

441
JOYNAT/__init__.py Normal file
View File

@@ -0,0 +1,441 @@
from __future__ import annotations
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
import requests
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.manifests import DASH
from devine.core.constants import AnyTrack
from devine.core.service import Service
from devine.core.titles import Movies, Series
from devine.core.tracks import Chapter, Tracks
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks, Track
import re
import json
import base64
import hashlib
import secrets
class JOYNAT(Service):
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = ()
# List of regions of which the service offers support for.
GEOFENCE = ("at")
TITLE_RE = r"^https?:\/\/www\.joyn\.at\/(?:play\/)?(?P<type>filme|serien)\/(?P<content_id>.+)$"
AUTH_CODE_REGEX = r"[&?]code=([^&]+)"
@staticmethod
@click.command(name="JOYN", short_help="https://joyn.at", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> JOYNAT:
return JOYNAT(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str):
self.title = title
super().__init__(ctx)
def get_session(self) -> requests.Session:
# modify the creation of the requests session (stored as self.session)
# make a super() call to take the original result and further modify it,
# or don't to make a completely fresh one if required.
session = super().get_session()
# Set default headers as specified
session.headers.update({
'Accept': '*/*',
'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Origin': 'https://www.joyn.at/',
'Referer': 'https://www.joyn.at/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0',
'Joyn-Platform': 'web',
'Joyn-Distribution-Tenant': 'JOYN_AT',
'Joyn-Country': 'AT',
'Joyn-Client-Version': '5.1370.1',
})
return session
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential) # important
if not cookies:
self._authenticate_anonymous()
else:
self._authenticate_with_cookies(cookies)
def generate_code_verifier(self):
return secrets.token_urlsafe(64)
def generate_code_challenge(self, verifier):
sha256_hash = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
def _authenticate_with_cookies(self, cookies: MozillaCookieJar) -> None:
auth_url = self.config["endpoints"]["auth_url"]
token_url = self.config["endpoints"]["token_url"]
redirect_uri = self.config["endpoints"]["redirect_uri"]
client_id = self.config["client"]["idc"]
code_verifier = self.generate_code_verifier()
code_challenge = self.generate_code_challenge(code_verifier)
self.session.cookies.update(cookies)
redirect_url_request = self.session.get(
auth_url,
headers={
**self.session.headers,
},
params={
'response_type': 'code',
'scope': 'openid email profile offline_access',
'view_type': 'login',
'client_id': client_id,
'prompt': 'consent',
'response_mode': 'query',
'cmpUcId': '9464e7a80af12c8cbdfbf2117f07f410b65af6af04ff3eee58ea2754590dfc83',
'redirect_uri': redirect_uri,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
},
)
redirect_url_request.raise_for_status()
redirect_url = redirect_url_request.url
# Find the auth_code using regex
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
if auth_code_match:
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
else:
self.log.error("Authorization code not found in redirect URL.")
raise EnvironmentError("Could not find authorization code.")
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/json'
},
cookies=cookies,
json={
'code': auth_code,
'client_id': client_id,
'redirect_uri': redirect_uri,
'code_verifier': code_verifier
},
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' in auth_response:
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
else:
self.log.error("No access_token found in response.")
raise EnvironmentError("Cookie authentication failed: no access token in response.")
def _authenticate_anonymous(self) -> None:
token_url = self.config["endpoints"]["anon_auth_url"]
client_id = self.config["client"]["id"]
client_name = self.config["client"]["name"]
anon_device_id = self.generate_code_verifier()
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/json'
},
json={'client_id': client_id, 'client_name': client_name, 'anon_device_id': anon_device_id},
)
response.raise_for_status()
auth_response = response.json()
self.log.info(f"Anonymous auth response: {auth_response}")
if 'access_token' in auth_response:
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with Joyn service successfully.")
else:
self.log.error("No access_token found in response.")
raise EnvironmentError("Anonymous authentication failed: no access token in response.")
# Required methods:
def get_titles(self) -> Union[Movies, Series]:
graphql_url = self.config["endpoints"]["graphql_url"]
api_key = self.config["client"]["api_key"]
client_name = self.config["client"]["name"]
try:
match = re.match(self.TITLE_RE, self.title)
if not match:
self.log.error(f"Invalid title URL format: {self.title}")
raise ValueError("Invalid title URL format.")
kind = match.group("type")
content_id = match.group("content_id")
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
if not kind or not content_id:
self.log.error(f"Invalid title URL: {self.title}. 'kind' or 'content_id' is missing.")
raise ValueError("Invalid title URL: 'kind' or 'content_id' is missing.")
if kind == "filme":
path = f'/filme/{content_id}'
response_data = self._execute_graphql_query('PageMovieDetailNewStatic', {'path': path}, '7b49493138f2162be230fd0e3fbf5722b1db6700a8842109ed3d98979898707a')
if 'page' in response_data and 'movie' in response_data['page']:
movie_data = response_data['page']['movie']
if 'id' not in movie_data or 'title' not in movie_data or 'productionYear' not in movie_data:
self.log.error("Invalid movie_data data received.")
raise ValueError("Invalid movie_data data received from Joyn service.")
if 'video' not in movie_data or 'id' not in movie_data['video']:
self.log.error("Invalid movie_data data received.")
raise ValueError("Invalid movie_data data received from Joyn service.")
return Movies(
[
Movie(
id_=movie_data['id'],
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
]
)
if kind == "serien":
path = f'/serien/{content_id}'
if len(content_id.split("/")) == 1:
response_data = self._execute_graphql_query('SeriesDetailPageStatic', {'path': path}, '43cad327eeae12e14dfb629d662ebc947d78b71ec91d972ea1ef46ccdb29eede')
if 'page' in response_data and 'series' in response_data['page']:
series_data = response_data['page']['series']
if 'title' not in series_data or 'allSeasons' not in series_data:
self.log.error("Invalid series_data data received.")
raise ValueError("Invalid series_data data received from Joyn service.")
episodes = []
for season in series_data['allSeasons']:
if 'id' not in season or 'number' not in season:
self.log.error("Invalid series_data data received.")
raise ValueError("Invalid series_data data received from Joyn service.")
#
response_data = self._execute_graphql_query('Season', {"id": season["id"]}, 'ee2396bb1b7c9f800e5cefd0b341271b7213fceb4ebe18d5a30dab41d703009f')
if 'season' in response_data:
season_data = response_data['season']
if 'episodes' not in season_data:
self.log.error("Invalid season_data data received.")
raise ValueError("Invalid season_data data received from Joyn service.")
for episode in season_data['episodes']:
if 'id' not in episode or 'title' not in episode or 'number' not in episode:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from Joyn service.")
if 'video' not in episode or 'id' not in episode['video']:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from Joyn service.")
episodes.append(
Episode(
id_=episode['id'],
service=self.__class__,
title=series_data['title'],
season=season['number'],
number=episode['number'],
name=episode['title'],
data=episode,
)
)
return Series(episodes)
elif len(content_id.split("/")) == 2:
response_data = self._execute_graphql_query('EpisodeDetailPageStatic', {'path': path}, 'c4bcacee94d38133e87879dad8d69bd8a74c7326262a1848cceb964b871c1551')
if 'page' in response_data and 'episode' in response_data['page']:
episode_data = response_data['page']['episode']
if 'id' not in episode_data or 'title' not in episode_data or 'number' not in episode_data:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
if 'season' not in episode_data or 'number' not in episode_data['season']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
if 'series' not in episode_data or 'title' not in episode_data['series']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
if 'video' not in episode_data or 'id' not in episode_data['video']:
self.log.error("Invalid episode_data data received.")
raise ValueError("Invalid episode_data data received from Joyn service.")
return Series(
[
Episode(
id_=episode_data['id'],
service=self.__class__,
title=episode_data['series']['title'],
season=episode_data['season']['number'],
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
]
)
self.log.error(f"Failed to fetch Movie data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch Movie data from Joyn service.")
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
entitlement_url = self.config["endpoints"]["entitlement"]
playout_url = self.config["endpoints"]["playout"]
if not isinstance(title, Episode) and not isinstance(title, Movie):
self.log.error(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
raise ValueError(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
content_id = title.data['video']['id']
entitlement = self.session.post(
entitlement_url,
headers={
**self.session.headers,
'Authorization': f'Bearer {self._joyn_auth_jwt}'
},
json={'content_id': content_id, 'content_type': 'VOD'}
)
entitlement.raise_for_status()
entitlement_data = entitlement.json()
if 'entitlement_token' not in entitlement_data:
self.log.error(f"Failed to fetch tracks entitlement: 'entitlement_token' not in entitlement_data")
raise EnvironmentError("Failed to fetch tracks entitlement from Joyn service.")
entitlement_token = entitlement_data['entitlement_token']
playlist = self.session.post(
playout_url.format(content_id=content_id),
headers={
**self.session.headers,
'Authorization': f'Bearer {entitlement_token}'
},
json={
'manufacturer': 'unknown',
'platform': 'browser',
'maxSecurityLevel': 1,
'streamingFormat': 'dash',
'model': 'unknown',
'protectionSystem': 'widevine',
'enableDolbyAudio': False,
'enableSubtitles': True,
'maxResolution': 1080,
'variantName': 'default',
}
)
playlist.raise_for_status()
playlist_data = playlist.json()
if not 'manifestUrl' in playlist_data or 'licenseUrl' not in playlist_data:
self.log.error(f"Failed to fetch tracks playlist: 'manifestUrl' or 'licenseUrl' not in entitlement_data")
raise EnvironmentError("Failed to fetch tracks playlist from Joyn service.")
manifest_url = playlist_data['manifestUrl']
license_url = playlist_data['licenseUrl']
all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
# Attach license_url to each track's data dictionary
for tr in all_tracks:
if tr.data is None:
tr.data = {}
tr.data['license_url'] = license_url
# Return a new Tracks object containing all collected tracks
return Tracks(all_tracks)
def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]:
# technically optional, but you must define and at least `return []`.
return Chapters()
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Optional[Union[bytes, str]]:
# Safely extract license_url from track.data
if hasattr(track, "data") and track.data:
license_url = track.data.get("license_url")
if not license_url:
raise ValueError("No license_url in track.data")
response = self.session.post(
license_url,
headers={
**self.session.headers,
'Content-Type': 'application/octet-stream',
'x-auth-token': self._joyn_auth_jwt
},
data=challenge
)
if response.status_code != 200:
self.log.error(f"Failed to fetch license: {response.status_code} - {response.text}")
raise ConnectionError(response.text)
self.log.info("Successfully fetched Widevine license from Joyn service.")
return response.content
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
**self.session.headers,
'X-Api-Key': self.config["client"]["api_key"]
},
params={
'operationName': operation_name,
'variables': json.dumps(variables).encode(),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}
}).encode()
}
)
response.raise_for_status()
response_data = response.json()
if 'data' not in response_data:
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")
return response_data['data']

18
JOYNAT/config.yaml Normal file
View File

@@ -0,0 +1,18 @@
# This config file is automatically loaded into `self.config` class instance variable.
# I recommend storing information like any de-obfuscated keys, base hosts, endpoints,
# or other such configuration data.
endpoints:
anon_auth_url : "https://auth.joyn.de/auth/anonymous"
auth_url: "https://auth.7pass.de/authz-srv/authz"
token_url: "https://auth.joyn.de/auth/7pass/token"
redirect_uri: "https://www.joyn.at/oauth"
graphql_url: "https://api.joyn.de/graphql"
entitlement: "https://entitlement.p7s1.io/api/user/entitlement-token"
playout: "https://api.vod-prd.s.joyn.de/v1/asset/{content_id}/playlist"
client:
id: "bb4f9c4c-82ca-486d-8eb5-8aaf772df93c"
idc: "ae892ce5-8920-4f38-b272-af7d1e242579"
name: "web"
api_key: "4f0fd9f18abbe3cf0e87fdb556bc39c8"

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
# devine_services
Devine Services für Joyn und RTL Plus

458
RTLP/__init__.py Normal file
View File

@@ -0,0 +1,458 @@
from __future__ import annotations
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
import requests
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.manifests import DASH
from devine.core.constants import AnyTrack
from devine.core.service import Service
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks, Track
import re
import json
import base64
import hashlib
import secrets
class RTLP(Service):
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = ()
# List of regions of which the service offers support for.
GEOFENCE = ("de", "at")
TITLE_RE = r"^https?:\/\/plus\.rtl\.de\/video-tv\/(?P<kind>shows|serien|filme)\/(?:[^\/]+-)?(?P<show_id>\d+)(?:\/[^\/]+-)?(?P<season_id>\d+)?(?:\/[^\/]+-)?(?P<episode_id>\d+)?$"
AUTH_CODE_REGEX = r"code=([\w-]+\.[\w-]+\.[\w-]+)"
@staticmethod
@click.command(name="RTLP", short_help="https://plus.rtl.de", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> RTLP:
return RTLP(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str):
self.title = title
super().__init__(ctx)
def get_session(self) -> requests.Session:
# modify the creation of the requests session (stored as self.session)
# make a super() call to take the original result and further modify it,
# or don't to make a completely fresh one if required.
session = super().get_session()
# Set default headers as specified
session.headers.update({
'Accept': '*/*',
'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Origin': 'https://plus.rtl.de',
'Referer': 'https://plus.rtl.de/',
'Rtlplus-Client-Id': 'rci:rtlplus:web',
'Rtlplus-Client-Version': '2024.7.29.2',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'
})
return session
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential) # important
if not cookies:
self._authenticate_anonymous()
else:
self._authenticate_with_cookies(cookies)
def generate_code_verifier(self):
return secrets.token_urlsafe(64)
def generate_code_challenge(self, verifier):
sha256_hash = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
def _authenticate_with_cookies(self, cookies: MozillaCookieJar) -> None:
auth_url = self.config["endpoints"]["auth_url"]
token_url = self.config["endpoints"]["token_url"]
code_verifier = self.generate_code_verifier()
code_challenge = self.generate_code_challenge(code_verifier)
redirect_url_request = self.session.get(
auth_url,
headers={
**self.session.headers,
},
params={
'client_id': 'rtlplus-web',
'redirect_uri': 'https://plus.rtl.de/silent-check-sso.html',
'response_type': 'code',
'scope': 'openid',
'code_challenge_method': 'S256',
'code_challenge': code_challenge
},
cookies=cookies,
)
redirect_url_request.raise_for_status()
redirect_url = redirect_url_request.url
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
if auth_code_match:
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
else:
self.log.error("Authorization code not found in redirect URL.")
raise EnvironmentError("Could not find authorization code.")
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/x-www-form-urlencoded'
},
cookies=cookies,
data=bytes(f'grant_type=authorization_code&client_id=rtlplus-web&redirect_uri=https%3A%2F%2Fplus.rtl.de%2Fsilent-check-sso.html&code={auth_code}&code_verifier={code_verifier}', 'utf-8'),
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' in auth_response:
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
else:
self.log.error("No access_token found in response.")
raise EnvironmentError("Cookie authentication failed: no access token in response.")
def _authenticate_anonymous(self) -> None:
token_url = self.config["endpoints"]["token_url"]
response = self.session.post(
token_url,
headers={
**self.session.headers,
'Content-Type': 'application/x-www-form-urlencoded'
},
data=bytes('grant_type=client_credentials&client_id=anonymous-user&client_secret=4bfeb73f-1c4a-4e9f-a7fa-96aa1ad3d94c', 'utf-8'),
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' in auth_response:
self._rtlp_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with RTL+ service successfully.")
else:
self.log.error("No access_token found in response.")
raise EnvironmentError("Anonymous authentication failed: no access token in response.")
# Required methods:
def get_titles(self) -> Union[Movies, Series]:
graphql_url = self.config["endpoints"]["graphql_url"]
try:
kind, show_id, season_id, episode_id = (
re.match(self.TITLE_RE, self.title).group(i) for i in ("kind", "show_id", "season_id", "episode_id")
)
except Exception:
raise ValueError("Could not parse ID from title - is the URL correct?")
if not kind or not show_id:
self.log.error(f"Invalid title URL: {self.title}. 'kind' or 'show_id' is missing.")
raise ValueError("Invalid title URL: 'kind' or 'show_id' is missing.")
if kind == "filme":
content_id = f'rrn:watch:videohub:movie:{show_id}'
response_data = self._execute_graphql_query('MovieDetail', {'id': content_id}, 'b1c360212cc518ddca2b8377813a54fa918ca424c08086204b7bf7d6ef626ac4')
if 'movie' in response_data:
movie_data = response_data['movie']
if 'id' not in movie_data or 'title' not in movie_data or 'productionYear' not in movie_data:
self.log.error("Invalid movie_data data received.")
raise ValueError("Invalid movie_data data received from RTL+ service.")
self.log.debug(f"Movie ID: {content_id}, Title: {movie_data['title']}")
return Movies(
[
Movie(
id_=content_id,
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
]
)
self.log.error(f"Failed to fetch Movie data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch Movie data from RTL+ service.")
if kind == "shows" or kind == "serien":
if episode_id:
content_id = f'rrn:watch:videohub:episode:{episode_id}'
response_data = self._execute_graphql_query('EpisodeDetail', {'episodeId': content_id}, '2e5ef142c79f8620e8e93c8f21b31a463b16d89a557f7f5f0c4a7e063be96a8a')
if 'episode' in response_data:
episode_data = response_data['episode']
if 'id' not in episode_data or 'title' not in episode_data or 'number' not in episode_data or 'episodeSeason' not in episode_data:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from RTL+ service.")
if 'format' not in episode_data and 'title' not in episode_data['format']:
self.log.error("Invalid episode format received.")
raise ValueError("Invalid episode format received from RTL+ service.")
return Series(
[
Episode(
id_=content_id,
service=self.__class__,
title=episode_data['format']['title'],
season=self.get_episode_session(episode_data),
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
]
)
elif season_id:
content_id = f'rrn:watch:videohub:season:{season_id}'
response_data = self._execute_graphql_query('SeasonWithFormatAndEpisodes', {'seasonId': content_id}, 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584')
if 'season' in response_data:
season_data = response_data['season']
if 'format' not in season_data or 'title' not in season_data['format']:
self.log.error("Invalid season format received.")
raise ValueError("Invalid season format received from RTL+ service.")
if not 'episodes' in season_data or not isinstance(season_data['episodes'], list):
self.log.error("Invalid season data received.")
raise ValueError("Invalid season data received from RTL+ service.")
episodes = []
for episode in season_data['episodes']:
if 'id' not in episode or 'title' not in episode or 'number' not in episode or 'episodeSeason' not in episode:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from RTL+ service.")
episodes.append(
Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self.get_episode_session(episode),
number=episode['number'],
name=episode['title'],
data=episode,
)
)
return Series(
episodes
)
elif show_id:
content_id = f'rrn:watch:videohub:format:{show_id}'
response_data = self._execute_graphql_query('Format', {'id': content_id}, 'd112638c0184ab5698af7b69532dfe2f12973f7af9cb137b9f70278130b1eafa')
if 'format' in response_data:
format_data = response_data['format']
if 'title' not in format_data or 'id' not in format_data:
self.log.error("Invalid format data received.")
raise ValueError("Invalid format data received from RTL+ service.")
if 'seasons' not in format_data or not isinstance(format_data['seasons'], list):
self.log.error("Invalid format seasons data received.")
raise ValueError("Invalid format seasons data received from RTL+ service.")
episodes = []
for season in format_data['seasons']:
if not 'id' in season or not 'seasonType' in season:
self.log.error("Invalid season data received.")
raise ValueError("Invalid season data received from RTL+ service.")
season_id = season['id']
response_data = self._execute_graphql_query('SeasonWithFormatAndEpisodes', {'seasonId': season_id}, 'cc0fbbe17143f549a35efa6f8665ceb9b1cfae44b590f0b2381a9a304304c584')
if 'season' in response_data:
season_data = response_data['season']
if 'format' not in season_data or 'title' not in season_data['format']:
self.log.error("Invalid season format received.")
raise ValueError("Invalid season format received from RTL+ service.")
if not 'episodes' in season_data or not isinstance(season_data['episodes'], list):
self.log.error("Invalid season data received.")
raise ValueError("Invalid season data received from RTL+ service.")
for episode in season_data['episodes']:
if 'id' not in episode or 'title' not in episode or 'number' not in episode or 'episodeSeason' not in episode:
self.log.error("Invalid episode data received.")
raise ValueError("Invalid episode data received from RTL+ service.")
episodes.append(
Episode(
id_=episode['id'],
service=self.__class__,
title=season_data['format']['title'],
season=self.get_episode_session(episode),
number=episode['number'],
name=episode['title'],
data=episode,
)
)
return Series(
episodes
)
self.log.error(f"Failed to fetch series data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch series data from RTL+ service.")
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
playout_url = self.config["endpoints"]["playout"]
if isinstance(title, Episode) or isinstance(title, Movie):
playout_url = playout_url.format(id=title.data['id'])
else:
self.log.error(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
raise ValueError(f"Unsupported title type: {type(title)}. Expected Series or Movies.")
response = self.session.get(
playout_url,
headers={
**self.session.headers,
'x-auth-token': self._rtlp_auth_jwt
}
)
if response and response.status_code == 200:
response_data = response.json()
all_parsed_tracks = [] # Use a list to collect all individual track objects
for variant in response_data:
if 'name' not in variant:
self.log.error("Invalid playout variant data received.")
raise ValueError("Invalid playout variant data received from RTL+ service.")
# Assuming 'dashsd' and 'dashhd' variants contain the MPD URLs
if variant['name'] == 'dashhd':
if 'sources' not in variant and len(variant['sources']) == 0:
self.log.warning(f"Variant '{variant['name']}' has no sources. Skipping.")
continue
source_entry = variant['sources'][0]
# Assuming the 'url' key in each source_entry is the DASH manifest URL
if not 'url' in source_entry:
self.log.warning(f"DASH source entry missing 'url': {source_entry}. Skipping.")
continue
manifest_url = source_entry['url']
try:
all_parsed_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de") # Use title's language for filtering/tagging
except Exception as e:
self.log.error(f"Failed to parse DASH manifest from {manifest_url}: {e}")
# Decide if you want to raise or just log and continue for other manifests
continue
# Return a new Tracks object containing all collected tracks
return Tracks(all_parsed_tracks)
else:
self.log.error(f"Failed to fetch tracks data: {response.status_code} - {response.text}")
raise EnvironmentError("Failed to fetch tracks data from RTL+ service.")
def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]:
# technically optional, but you must define and at least `return []`.
return Chapters()
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Optional[Union[bytes, str]]:
license_url = self.config["endpoints"]["license"]
response = self.session.post(
license_url,
headers={
**self.session.headers,
'Content-Type': 'application/octet-stream',
'x-auth-token': self._rtlp_auth_jwt
},
data=challenge
)
if response.status_code != 200:
self.log.error(f"Failed to fetch license: {response.status_code} - {response.text}")
raise ConnectionError(response.text)
self.log.info("Successfully fetched Widevine license from RTL+ service.")
return response.content
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
**self.session.headers,
'Authorization': f'Bearer {self._rtlp_auth_jwt}'
},
params={
'operationName': operation_name,
'variables': json.dumps(variables).encode(),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}
}).encode()
}
)
response.raise_for_status()
response_data = response.json()
if 'data' not in response_data:
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")
return response_data['data']
def get_episode_session(self, episode) -> int:
season_value = None
if 'seasonType' not in episode['episodeSeason']:
self.log.error("Invalid episode season received.")
raise ValueError("Invalid episode season received from RTL+ service.")
seasonType = episode['episodeSeason']['seasonType']
if seasonType == 'ANNUAL':
if 'season' in episode['episodeSeason'] and 'year' in episode['episodeSeason']['season']:
season_value = int(episode['episodeSeason']['season']['year'])
elif seasonType == 'ORDINAL':
if 'season' in episode['episodeSeason'] and 'ordinal' in episode['episodeSeason']['season']:
season_value = int(episode['episodeSeason']['season']['ordinal'])
else:
self.log.error(f"Unknown season type '{seasonType}' received.")
raise ValueError(f"Unknown season type '{seasonType}' received from RTL+ service.")
return season_value

13
RTLP/config.yaml Normal file
View File

@@ -0,0 +1,13 @@
# This config file is automatically loaded into `self.config` class instance variable.
# I recommend storing information like any de-obfuscated keys, base hosts, endpoints,
# or other such configuration data.
endpoints:
auth_url: "https://auth.rtl.de/auth/realms/rtlplus/protocol/openid-connect/auth"
token_url: "https://auth.rtl.de/auth/realms/rtlplus/protocol/openid-connect/token"
graphql_url: "https://cdn.gateway.now-plus-prod.aws-cbc.cloud/graphql"
playout: "https://stus.player.streamingtech.de/watch-playout-variants/{id}?platform=web"
license: "https://rtlplus-widevine.streamingtech.de/index/rtlplus"
client:
id: "2a970b6d-adf2-4cf6-833f-9d940c300d09"