Files
devine_services/JOYNDE/__init__.py

461 lines
17 KiB
Python

from __future__ import annotations
import base64
import hashlib
import json
import re
import secrets
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
import click
import requests
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Chapters, Tracks
class JOYNDE(Service):
"""Joyn Germmany (joyn.de) streaming service."""
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = ()
# List of regions of which the service offers support for.
GEOFENCE = ("de",)
TITLE_RE = r"^https?:\/\/www\.joyn\.de\/(?:play\/)?(?P<type>filme|serien|compilation)\/(?P<content_id>.+)$"
AUTH_CODE_REGEX = r"[&?]code=([^&]+)"
@staticmethod
@click.command(name="JOYN", short_help="https://joyn.de", help=__doc__)
@click.argument("title", type=str)
@click.option("--age-bypass", is_flag=True, default=False, help="Download age gated videos with a rating of 16 years old or above.")
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> JOYNDE:
return JOYNDE(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, age_bypass: bool = False):
self.title = title
self.age_bypass = age_bypass
super().__init__(ctx)
def get_session(self) -> requests.Session:
# modify the creation of the requests session (stored as self.session)
# make a super() call to take the original result and further modify it,
# or don't to make a completely fresh one if required.
session = super().get_session()
# Set default headers as specified
session.headers.update({
'Accept': '*/*',
'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Origin': 'https://www.joyn.de/',
'Referer': 'https://www.joyn.de/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0',
'Joyn-Platform': 'web',
'Joyn-Distribution-Tenant': 'JOYN_DE',
'Joyn-Country': 'AT',
'Joyn-Client-Version': '5.1370.1',
})
return session
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential) # important
if not cookies:
self._authenticate_anonymous()
else:
self._authenticate_with_cookies(cookies)
@staticmethod
def _generate_code_verifier() -> str:
"""Generate a PKCE code verifier."""
return secrets.token_urlsafe(64)
@staticmethod
def _generate_code_challenge(verifier: str) -> str:
"""Generate a PKCE code challenge from a verifier."""
sha256_hash = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
def _authenticate_with_cookies(self, cookies: MozillaCookieJar) -> None:
auth_url = self.config["endpoints"]["auth_url"]
token_url = self.config["endpoints"]["token_url"]
redirect_uri = self.config["endpoints"]["redirect_uri"]
client_id = self.config["client"]["id"]
code_verifier = self._generate_code_verifier()
code_challenge = self._generate_code_challenge(code_verifier)
redirect_url_request = self.session.get(
auth_url,
params={
'response_type': 'code',
'scope': 'openid email profile offline_access',
'view_type': 'login',
'client_id': client_id,
'prompt': 'consent',
'response_mode': 'query',
'cmpUcId': '9464e7a80af12c8cbdfbf2117f07f410b65af6af04ff3eee58ea2754590dfc83',
'redirect_uri': redirect_uri,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
},
)
redirect_url_request.raise_for_status()
redirect_url = redirect_url_request.url
# Find the auth_code using regex
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
if not auth_code_match:
raise EnvironmentError("Authorization code not found in redirect URL.")
auth_code = auth_code_match.group(1)
self.log.debug(f"Auth Code: {auth_code}")
response = self.session.post(
token_url,
headers={
'Content-Type': 'application/json'
},
cookies=cookies,
json={
'code': auth_code,
'client_id': client_id,
'redirect_uri': redirect_uri,
'code_verifier': code_verifier
},
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' not in auth_response:
raise EnvironmentError("Cookie authentication failed: no access token in response.")
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Successfully authenticated with cookies.")
def _authenticate_anonymous(self) -> None:
token_url = self.config["endpoints"]["anon_auth_url"]
anon_device_id = self.config["client"]["anon_device_id"]
client_name = self.config["client"]["name"]
response = self.session.post(
token_url,
headers={
'Content-Type': 'application/json',
},
json={
'client_id': anon_device_id,
'client_name': client_name,
'anon_device_id': anon_device_id,
},
)
response.raise_for_status()
auth_response = response.json()
if 'access_token' not in auth_response:
raise EnvironmentError("Anonymous authentication failed: no access token in response.")
self._joyn_auth_jwt = auth_response['access_token']
self.log.info("Authenticated anonymously with Joyn service.")
def _is_age_restricted(self, data: dict, title_name: str) -> bool:
"""Check if content is age-restricted and should be skipped.
Returns True if the content has an age rating >= 16 and --age-bypass
was not specified.
"""
age_rating = data.get('ageRating') or {}
min_age = age_rating.get('minAge', 0)
if min_age >= 16 and not self.age_bypass:
self.log.warning(f"Skipping '{title_name}' due to age rating ({min_age}+). Use --age-bypass to download.")
return True
return False
@staticmethod
def _validate_required_fields(data: dict, fields: list[str], context: str) -> None:
"""Raise ValueError if any required fields are missing from data."""
missing = [f for f in fields if f not in data]
if missing:
raise ValueError(f"Missing required fields {missing} in {context}.")
def _validate_video_field(self, data: dict, context: str) -> None:
"""Validate that a 'video.id' field exists in the data."""
if 'video' not in data or 'id' not in data['video']:
raise ValueError(f"Missing 'video.id' in {context}.")
# ------------------------------------------------------------------
# Required service methods
# ------------------------------------------------------------------
def get_titles(self) -> Union[Movies, Series]:
match = re.match(self.TITLE_RE, self.title)
if not match:
raise ValueError(f"Invalid title URL format: {self.title}")
kind = match.group("type")
content_id = match.group("content_id")
if not kind or not content_id:
raise ValueError(f"Invalid title URL: {self.title}. 'kind' or 'content_id' is missing.")
if kind == "filme":
return self._get_movie_titles(content_id)
elif kind == "serien":
return self._get_series_titles(content_id)
raise ValueError(f"Unsupported content type: '{kind}'.")
def _get_movie_titles(self, content_id: str) -> Movies:
"""Fetch and return movie title data."""
path = f'/filme/{content_id}'
response_data = self._execute_graphql_query(
'PageMovieDetailNewStatic', {'path': path},
'7b49493138f2162be230fd0e3fbf5722b1db6700a8842109ed3d98979898707a',
)
movie_data = response_data.get('page', {}).get('movie')
if not movie_data:
raise ValueError("Failed to fetch movie data from Joyn service.")
self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data')
self._validate_video_field(movie_data, 'movie_data')
if self._is_age_restricted(movie_data, movie_data['title']):
return Movies([])
return Movies([
Movie(
id_=movie_data['id'],
service=self.__class__,
name=movie_data['title'],
data=movie_data,
year=movie_data['productionYear'],
)
])
def _get_series_titles(self, content_id: str) -> Series:
"""Fetch and return series/episode title data."""
path = f'/serien/{content_id}'
parts = content_id.split("/")
if len(parts) == 1:
return self._get_full_series(path)
elif len(parts) == 2:
return self._get_single_episode(path)
raise ValueError(f"Unexpected series URL depth: '{content_id}'.")
def _get_full_series(self, path: str) -> Series:
"""Fetch all episodes across all seasons of a series."""
response_data = self._execute_graphql_query(
'SeriesDetailPageStatic', {'path': path},
'43cad327eeae12e14dfb629d662ebc947d78b71ec91d972ea1ef46ccdb29eede',
)
series_data = response_data.get('page', {}).get('series')
if not series_data:
raise ValueError("Failed to fetch series data from Joyn service.")
self._validate_required_fields(series_data, ['title', 'allSeasons'], 'series_data')
episodes = []
for season in series_data['allSeasons']:
self._validate_required_fields(season, ['id', 'number'], 'season')
season_response = self._execute_graphql_query(
'Season', {'id': season['id']},
'ee2396bb1b7c9f800e5cefd0b341271b7213fceb4ebe18d5a30dab41d703009f',
)
season_data = season_response.get('season')
if not season_data:
continue
self._validate_required_fields(season_data, ['episodes'], 'season_data')
for episode in season_data['episodes']:
self._validate_required_fields(episode, ['id', 'title', 'number'], 'episode')
self._validate_video_field(episode, 'episode')
if self._is_age_restricted(episode, episode['title']):
continue
episodes.append(Episode(
id_=episode['id'],
service=self.__class__,
title=series_data['title'],
season=season['number'],
number=episode['number'],
name=episode['title'],
data=episode,
))
return Series(episodes)
def _get_single_episode(self, path: str) -> Series:
"""Fetch a single episode by its direct URL."""
response_data = self._execute_graphql_query(
'EpisodeDetailPageStatic', {'path': path},
'c4bcacee94d38133e87879dad8d69bd8a74c7326262a1848cceb964b871c1551',
)
episode_data = response_data.get('page', {}).get('episode')
if not episode_data:
raise ValueError("Failed to fetch episode data from Joyn service.")
self._validate_required_fields(episode_data, ['id', 'title', 'number'], 'episode_data')
self._validate_required_fields(episode_data.get('season', {}), ['number'], 'episode_data.season')
self._validate_required_fields(episode_data.get('series', {}), ['title'], 'episode_data.series')
self._validate_video_field(episode_data, 'episode_data')
if self._is_age_restricted(episode_data, episode_data['title']):
return Series([])
return Series([
Episode(
id_=episode_data['id'],
service=self.__class__,
title=episode_data['series']['title'],
season=episode_data['season']['number'],
number=episode_data['number'],
name=episode_data['title'],
data=episode_data,
)
])
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
if not isinstance(title, (Episode, Movie)):
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
content_id = title.data['video']['id']
# Step 1: Obtain entitlement token
entitlement_data = self._fetch_entitlement(content_id)
entitlement_token = entitlement_data['entitlement_token']
# Step 2: Obtain playlist/manifest
playlist_data = self._fetch_playlist(content_id, entitlement_token)
manifest_url = playlist_data['manifestUrl']
license_url = playlist_data.get('licenseUrl')
# Step 3: Parse DASH manifest into tracks
all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
for track in all_tracks:
if track.data is None:
track.data = {}
track.data['license_url'] = license_url
return Tracks(all_tracks)
def _fetch_entitlement(self, content_id: str) -> dict:
"""Request an entitlement token for the given content."""
response = self.session.post(
self.config["endpoints"]["entitlement"],
headers={
'Authorization': f'Bearer {self._joyn_auth_jwt}',
},
json={'content_id': content_id, 'content_type': 'VOD'},
)
response.raise_for_status()
data = response.json()
if 'entitlement_token' not in data:
raise EnvironmentError("Entitlement response missing 'entitlement_token'.")
return data
def _fetch_playlist(self, content_id: str, entitlement_token: str) -> dict:
"""Request the playout/manifest URL for the given content."""
response = self.session.post(
self.config["endpoints"]["playout"].format(content_id=content_id),
headers={
'Authorization': f'Bearer {entitlement_token}',
},
json={
'manufacturer': 'unknown',
'platform': 'browser',
'maxSecurityLevel': 1,
'streamingFormat': 'dash',
'model': 'unknown',
'protectionSystem': 'widevine',
'enableDolbyAudio': False,
'enableSubtitles': True,
'maxResolution': 1080,
'variantName': 'default',
},
)
response.raise_for_status()
data = response.json()
if 'manifestUrl' not in data:
raise EnvironmentError("Playlist response missing 'manifestUrl'.")
return data
def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]:
# technically optional, but you must define and at least `return []`.
return Chapters()
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
return None
def get_widevine_license(
self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack,
) -> Optional[Union[bytes, str]]:
"""Obtain a Widevine license for the given track."""
license_url = track.data.get('license_url')
if not license_url:
raise ValueError("No license_url in track.data.")
response = self.session.post(
license_url,
headers={
'Content-Type': 'application/octet-stream',
'x-auth-token': self._joyn_auth_jwt,
},
data=challenge,
)
response.raise_for_status()
self.log.info("Successfully fetched Widevine license.")
return response.content
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
response = self.session.get(
self.config["endpoints"]["graphql_url"],
headers={
'X-Api-Key': self.config["client"]["api_key"]
},
params={
'operationName': operation_name,
'variables': json.dumps(variables),
'extensions': json.dumps({
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}
}).encode()
}
)
response.raise_for_status()
response_data = response.json()
if response_data.get("errors"):
raise ValueError(f"GraphQL errors for '{operation_name}': {response_data['errors']}")
if 'data' not in response_data:
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")
return response_data['data']