from __future__ import annotations import base64 import hashlib import json import re import secrets from http.cookiejar import MozillaCookieJar from typing import Any, Optional, Union import click import requests from devine.core.constants import AnyTrack from devine.core.credential import Credential from devine.core.manifests import DASH from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series from devine.core.tracks import Chapter, Chapters, Tracks class JOYNAT(Service): """Joyn Austria (joyn.at) streaming service.""" # List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase. ALIASES = () # List of regions of which the service offers support for. GEOFENCE = ("at",) TITLE_RE = r"^https?:\/\/www\.joyn\.at\/(?:play\/)?(?Pfilme|serien|compilation)\/(?P.+)$" AUTH_CODE_REGEX = r"[&?]code=([^&]+)" @staticmethod @click.command(name="JOYN", short_help="https://joyn.at", help=__doc__) @click.argument("title", type=str) @click.option("--age-bypass", is_flag=True, default=False, help="Download age gated videos with a rating of 16 years old or above.") @click.pass_context def cli(ctx: click.Context, **kwargs: Any) -> JOYNAT: return JOYNAT(ctx, **kwargs) def __init__(self, ctx: click.Context, title: str, age_bypass: bool = False): self.title = title self.age_bypass = age_bypass super().__init__(ctx) def get_session(self) -> requests.Session: # modify the creation of the requests session (stored as self.session) # make a super() call to take the original result and further modify it, # or don't to make a completely fresh one if required. session = super().get_session() # Set default headers as specified session.headers.update({ 'Accept': '*/*', 'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Origin': 'https://www.joyn.at/', 'Referer': 'https://www.joyn.at/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0', 'Joyn-Platform': 'web', 'Joyn-Distribution-Tenant': 'JOYN_AT', 'Joyn-Country': 'AT', 'Joyn-Client-Version': '5.1370.1', }) return session def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) # important if not cookies: self._authenticate_anonymous() else: self._authenticate_with_cookies(cookies) @staticmethod def _generate_code_verifier() -> str: """Generate a PKCE code verifier.""" return secrets.token_urlsafe(64) @staticmethod def _generate_code_challenge(verifier: str) -> str: """Generate a PKCE code challenge from a verifier.""" sha256_hash = hashlib.sha256(verifier.encode()).digest() return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=") def _authenticate_with_cookies(self, cookies: MozillaCookieJar) -> None: auth_url = self.config["endpoints"]["auth_url"] token_url = self.config["endpoints"]["token_url"] redirect_uri = self.config["endpoints"]["redirect_uri"] client_id = self.config["client"]["id"] code_verifier = self._generate_code_verifier() code_challenge = self._generate_code_challenge(code_verifier) redirect_url_request = self.session.get( auth_url, params={ 'response_type': 'code', 'scope': 'openid email profile offline_access', 'view_type': 'login', 'client_id': client_id, 'prompt': 'consent', 'response_mode': 'query', 'cmpUcId': '9464e7a80af12c8cbdfbf2117f07f410b65af6af04ff3eee58ea2754590dfc83', 'redirect_uri': redirect_uri, 'code_challenge': code_challenge, 'code_challenge_method': 'S256', }, ) redirect_url_request.raise_for_status() redirect_url = redirect_url_request.url # Find the auth_code using regex auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url) if not auth_code_match: raise EnvironmentError("Authorization code not found in redirect URL.") auth_code = auth_code_match.group(1) self.log.debug(f"Auth Code: {auth_code}") response = self.session.post( token_url, headers={ 'Content-Type': 'application/json' }, cookies=cookies, json={ 'code': auth_code, 'client_id': client_id, 'redirect_uri': redirect_uri, 'code_verifier': code_verifier }, ) response.raise_for_status() auth_response = response.json() if 'access_token' not in auth_response: raise EnvironmentError("Cookie authentication failed: no access token in response.") self._joyn_auth_jwt = auth_response['access_token'] self.log.info("Successfully authenticated with cookies.") def _authenticate_anonymous(self) -> None: token_url = self.config["endpoints"]["anon_auth_url"] anon_device_id = self.config["client"]["anon_device_id"] client_name = self.config["client"]["name"] response = self.session.post( token_url, headers={ 'Content-Type': 'application/json', }, json={ 'client_id': anon_device_id, 'client_name': client_name, 'anon_device_id': anon_device_id, }, ) response.raise_for_status() auth_response = response.json() if 'access_token' not in auth_response: raise EnvironmentError("Anonymous authentication failed: no access token in response.") self._joyn_auth_jwt = auth_response['access_token'] self.log.info("Authenticated anonymously with Joyn service.") def _is_age_restricted(self, data: dict, title_name: str) -> bool: """Check if content is age-restricted and should be skipped. Returns True if the content has an age rating >= 16 and --age-bypass was not specified. """ age_rating = data.get('ageRating') or {} min_age = age_rating.get('minAge', 0) if min_age >= 16 and not self.age_bypass: self.log.warning(f"Skipping '{title_name}' due to age rating ({min_age}+). Use --age-bypass to download.") return True return False @staticmethod def _validate_required_fields(data: dict, fields: list[str], context: str) -> None: """Raise ValueError if any required fields are missing from data.""" missing = [f for f in fields if f not in data] if missing: raise ValueError(f"Missing required fields {missing} in {context}.") def _validate_video_field(self, data: dict, context: str) -> None: """Validate that a 'video.id' field exists in the data.""" if 'video' not in data or 'id' not in data['video']: raise ValueError(f"Missing 'video.id' in {context}.") # ------------------------------------------------------------------ # Required service methods # ------------------------------------------------------------------ def get_titles(self) -> Union[Movies, Series]: match = re.match(self.TITLE_RE, self.title) if not match: raise ValueError(f"Invalid title URL format: {self.title}") kind = match.group("type") content_id = match.group("content_id") if not kind or not content_id: raise ValueError(f"Invalid title URL: {self.title}. 'kind' or 'content_id' is missing.") if kind == "filme": return self._get_movie_titles(content_id) elif kind == "serien": return self._get_series_titles(content_id) raise ValueError(f"Unsupported content type: '{kind}'.") def _get_movie_titles(self, content_id: str) -> Movies: """Fetch and return movie title data.""" path = f'/filme/{content_id}' response_data = self._execute_graphql_query( 'PageMovieDetailNewStatic', {'path': path}, '7b49493138f2162be230fd0e3fbf5722b1db6700a8842109ed3d98979898707a', ) movie_data = response_data.get('page', {}).get('movie') if not movie_data: raise ValueError("Failed to fetch movie data from Joyn service.") self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data') self._validate_video_field(movie_data, 'movie_data') if self._is_age_restricted(movie_data, movie_data['title']): return Movies([]) return Movies([ Movie( id_=movie_data['id'], service=self.__class__, name=movie_data['title'], data=movie_data, year=movie_data['productionYear'], ) ]) def _get_series_titles(self, content_id: str) -> Series: """Fetch and return series/episode title data.""" path = f'/serien/{content_id}' parts = content_id.split("/") if len(parts) == 1: return self._get_full_series(path) elif len(parts) == 2: return self._get_single_episode(path) raise ValueError(f"Unexpected series URL depth: '{content_id}'.") def _get_full_series(self, path: str) -> Series: """Fetch all episodes across all seasons of a series.""" response_data = self._execute_graphql_query( 'SeriesDetailPageStatic', {'path': path}, '43cad327eeae12e14dfb629d662ebc947d78b71ec91d972ea1ef46ccdb29eede', ) series_data = response_data.get('page', {}).get('series') if not series_data: raise ValueError("Failed to fetch series data from Joyn service.") self._validate_required_fields(series_data, ['title', 'allSeasons'], 'series_data') episodes = [] for season in series_data['allSeasons']: self._validate_required_fields(season, ['id', 'number'], 'season') season_response = self._execute_graphql_query( 'Season', {'id': season['id']}, 'ee2396bb1b7c9f800e5cefd0b341271b7213fceb4ebe18d5a30dab41d703009f', ) season_data = season_response.get('season') if not season_data: continue self._validate_required_fields(season_data, ['episodes'], 'season_data') for episode in season_data['episodes']: self._validate_required_fields(episode, ['id', 'title', 'number'], 'episode') self._validate_video_field(episode, 'episode') if self._is_age_restricted(episode, episode['title']): continue episodes.append(Episode( id_=episode['id'], service=self.__class__, title=series_data['title'], season=season['number'], number=episode['number'], name=episode['title'], data=episode, )) return Series(episodes) def _get_single_episode(self, path: str) -> Series: """Fetch a single episode by its direct URL.""" response_data = self._execute_graphql_query( 'EpisodeDetailPageStatic', {'path': path}, 'c4bcacee94d38133e87879dad8d69bd8a74c7326262a1848cceb964b871c1551', ) episode_data = response_data.get('page', {}).get('episode') if not episode_data: raise ValueError("Failed to fetch episode data from Joyn service.") self._validate_required_fields(episode_data, ['id', 'title', 'number'], 'episode_data') self._validate_required_fields(episode_data.get('season', {}), ['number'], 'episode_data.season') self._validate_required_fields(episode_data.get('series', {}), ['title'], 'episode_data.series') self._validate_video_field(episode_data, 'episode_data') if self._is_age_restricted(episode_data, episode_data['title']): return Series([]) return Series([ Episode( id_=episode_data['id'], service=self.__class__, title=episode_data['series']['title'], season=episode_data['season']['number'], number=episode_data['number'], name=episode_data['title'], data=episode_data, ) ]) def get_tracks(self, title: Union[Episode, Movie]) -> Tracks: if not isinstance(title, (Episode, Movie)): raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.") content_id = title.data['video']['id'] # Step 1: Obtain entitlement token entitlement_data = self._fetch_entitlement(content_id) entitlement_token = entitlement_data['entitlement_token'] # Step 2: Obtain playlist/manifest playlist_data = self._fetch_playlist(content_id, entitlement_token) manifest_url = playlist_data['manifestUrl'] license_url = playlist_data.get('licenseUrl') # Step 3: Parse DASH manifest into tracks all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de") for track in all_tracks: if track.data is None: track.data = {} track.data['license_url'] = license_url return Tracks(all_tracks) def _fetch_entitlement(self, content_id: str) -> dict: """Request an entitlement token for the given content.""" response = self.session.post( self.config["endpoints"]["entitlement"], headers={ 'Authorization': f'Bearer {self._joyn_auth_jwt}', }, json={'content_id': content_id, 'content_type': 'VOD'}, ) response.raise_for_status() data = response.json() if 'entitlement_token' not in data: raise EnvironmentError("Entitlement response missing 'entitlement_token'.") return data def _fetch_playlist(self, content_id: str, entitlement_token: str) -> dict: """Request the playout/manifest URL for the given content.""" response = self.session.post( self.config["endpoints"]["playout"].format(content_id=content_id), headers={ 'Authorization': f'Bearer {entitlement_token}', }, json={ 'manufacturer': 'unknown', 'platform': 'browser', 'maxSecurityLevel': 1, 'streamingFormat': 'dash', 'model': 'unknown', 'protectionSystem': 'widevine', 'enableDolbyAudio': False, 'enableSubtitles': True, 'maxResolution': 1080, 'variantName': 'default', }, ) response.raise_for_status() data = response.json() if 'manifestUrl' not in data: raise EnvironmentError("Playlist response missing 'manifestUrl'.") return data def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]: # technically optional, but you must define and at least `return []`. return Chapters() def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]: return None def get_widevine_license( self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack, ) -> Optional[Union[bytes, str]]: """Obtain a Widevine license for the given track.""" license_url = track.data.get('license_url') if not license_url: raise ValueError("No license_url in track.data.") response = self.session.post( license_url, headers={ 'Content-Type': 'application/octet-stream', 'x-auth-token': self._joyn_auth_jwt, }, data=challenge, ) response.raise_for_status() self.log.info("Successfully fetched Widevine license.") return response.content def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict: response = self.session.get( self.config["endpoints"]["graphql_url"], headers={ 'X-Api-Key': self.config["client"]["api_key"] }, params={ 'operationName': operation_name, 'variables': json.dumps(variables), 'extensions': json.dumps({ 'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash} }).encode() } ) response.raise_for_status() response_data = response.json() if response_data.get("errors"): raise ValueError(f"GraphQL errors for '{operation_name}': {response_data['errors']}") if 'data' not in response_data: self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.") raise ValueError(f"Invalid GraphQL response for '{operation_name}'.") return response_data['data']