458 lines
18 KiB
Python
458 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import secrets
|
|
from http.cookiejar import MozillaCookieJar
|
|
from typing import Any, Optional, Union
|
|
|
|
import click
|
|
import requests
|
|
|
|
from devine.core.constants import AnyTrack
|
|
from devine.core.credential import Credential
|
|
from devine.core.manifests import DASH
|
|
from devine.core.service import Service
|
|
from devine.core.titles import Episode, Movie, Movies, Series
|
|
from devine.core.tracks import Chapter, Chapters, Tracks
|
|
|
|
|
|
class JOYNAT(Service):
|
|
"""Joyn Austria (joyn.at) streaming service."""
|
|
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
|
|
ALIASES = ()
|
|
|
|
# List of regions of which the service offers support for.
|
|
GEOFENCE = ("at",)
|
|
|
|
TITLE_RE = r"^https?:\/\/www\.joyn\.at\/(?:play\/)?(?P<type>filme|serien|compilation)\/(?P<content_id>.+)$"
|
|
AUTH_CODE_REGEX = r"[&?]code=([^&]+)"
|
|
|
|
@staticmethod
|
|
@click.command(name="JOYN", short_help="https://joyn.at", help=__doc__)
|
|
@click.argument("title", type=str)
|
|
@click.option("--age-bypass", is_flag=True, default=False, help="Download age gated videos with a rating of 16 years old or above.")
|
|
@click.pass_context
|
|
def cli(ctx: click.Context, **kwargs: Any) -> JOYNAT:
|
|
return JOYNAT(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx: click.Context, title: str, age_bypass: bool = False):
|
|
self.title = title
|
|
self.age_bypass = age_bypass
|
|
|
|
super().__init__(ctx)
|
|
|
|
def get_session(self) -> requests.Session:
|
|
# modify the creation of the requests session (stored as self.session)
|
|
# make a super() call to take the original result and further modify it,
|
|
# or don't to make a completely fresh one if required.
|
|
|
|
session = super().get_session()
|
|
|
|
# Set default headers as specified
|
|
session.headers.update({
|
|
'Accept': '*/*',
|
|
'Accept-Language': 'de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
|
|
'Origin': 'https://www.joyn.at/',
|
|
'Referer': 'https://www.joyn.at/',
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0',
|
|
'Joyn-Platform': 'web',
|
|
'Joyn-Distribution-Tenant': 'JOYN_AT',
|
|
'Joyn-Country': 'AT',
|
|
'Joyn-Client-Version': '5.1370.1',
|
|
})
|
|
|
|
return session
|
|
|
|
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
|
|
super().authenticate(cookies, credential) # important
|
|
|
|
if not cookies:
|
|
self._authenticate_anonymous()
|
|
|
|
else:
|
|
self._authenticate_with_cookies(cookies)
|
|
|
|
@staticmethod
|
|
def _generate_code_verifier() -> str:
|
|
"""Generate a PKCE code verifier."""
|
|
return secrets.token_urlsafe(64)
|
|
|
|
@staticmethod
|
|
def _generate_code_challenge(verifier: str) -> str:
|
|
"""Generate a PKCE code challenge from a verifier."""
|
|
sha256_hash = hashlib.sha256(verifier.encode()).digest()
|
|
return base64.urlsafe_b64encode(sha256_hash).decode().rstrip("=")
|
|
|
|
def _authenticate_with_cookies(self, cookies: MozillaCookieJar) -> None:
|
|
auth_url = self.config["endpoints"]["auth_url"]
|
|
token_url = self.config["endpoints"]["token_url"]
|
|
redirect_uri = self.config["endpoints"]["redirect_uri"]
|
|
client_id = self.config["client"]["id"]
|
|
|
|
code_verifier = self._generate_code_verifier()
|
|
code_challenge = self._generate_code_challenge(code_verifier)
|
|
|
|
redirect_url_request = self.session.get(
|
|
auth_url,
|
|
params={
|
|
'response_type': 'code',
|
|
'scope': 'openid email profile offline_access',
|
|
'view_type': 'login',
|
|
'client_id': client_id,
|
|
'prompt': 'consent',
|
|
'response_mode': 'query',
|
|
'cmpUcId': '9464e7a80af12c8cbdfbf2117f07f410b65af6af04ff3eee58ea2754590dfc83',
|
|
'redirect_uri': redirect_uri,
|
|
'code_challenge': code_challenge,
|
|
'code_challenge_method': 'S256',
|
|
},
|
|
)
|
|
redirect_url_request.raise_for_status()
|
|
|
|
redirect_url = redirect_url_request.url
|
|
|
|
# Find the auth_code using regex
|
|
auth_code_match = re.search(self.AUTH_CODE_REGEX, redirect_url)
|
|
if not auth_code_match:
|
|
raise EnvironmentError("Authorization code not found in redirect URL.")
|
|
|
|
auth_code = auth_code_match.group(1)
|
|
self.log.debug(f"Auth Code: {auth_code}")
|
|
|
|
response = self.session.post(
|
|
token_url,
|
|
headers={
|
|
'Content-Type': 'application/json'
|
|
},
|
|
cookies=cookies,
|
|
json={
|
|
'code': auth_code,
|
|
'client_id': client_id,
|
|
'redirect_uri': redirect_uri,
|
|
'code_verifier': code_verifier
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
auth_response = response.json()
|
|
if 'access_token' not in auth_response:
|
|
raise EnvironmentError("Cookie authentication failed: no access token in response.")
|
|
|
|
self._joyn_auth_jwt = auth_response['access_token']
|
|
self.log.info("Successfully authenticated with cookies.")
|
|
|
|
|
|
def _authenticate_anonymous(self) -> None:
|
|
token_url = self.config["endpoints"]["anon_auth_url"]
|
|
anon_device_id = self.config["client"]["anon_device_id"]
|
|
client_name = self.config["client"]["name"]
|
|
|
|
response = self.session.post(
|
|
token_url,
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
},
|
|
json={
|
|
'client_id': anon_device_id,
|
|
'client_name': client_name,
|
|
'anon_device_id': anon_device_id,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
auth_response = response.json()
|
|
if 'access_token' not in auth_response:
|
|
raise EnvironmentError("Anonymous authentication failed: no access token in response.")
|
|
|
|
self._joyn_auth_jwt = auth_response['access_token']
|
|
self.log.info("Authenticated anonymously with Joyn service.")
|
|
|
|
|
|
def _is_age_restricted(self, data: dict, title_name: str) -> bool:
|
|
"""Check if content is age-restricted and should be skipped.
|
|
|
|
Returns True if the content has an age rating >= 16 and --age-bypass
|
|
was not specified.
|
|
"""
|
|
age_rating = data.get('ageRating') or {}
|
|
min_age = age_rating.get('minAge', 0)
|
|
if min_age >= 16 and not self.age_bypass:
|
|
self.log.warning(f"Skipping '{title_name}' due to age rating ({min_age}+). Use --age-bypass to download.")
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def _validate_required_fields(data: dict, fields: list[str], context: str) -> None:
|
|
"""Raise ValueError if any required fields are missing from data."""
|
|
missing = [f for f in fields if f not in data]
|
|
if missing:
|
|
raise ValueError(f"Missing required fields {missing} in {context}.")
|
|
|
|
def _validate_video_field(self, data: dict, context: str) -> None:
|
|
"""Validate that a 'video.id' field exists in the data."""
|
|
if 'video' not in data or 'id' not in data['video']:
|
|
raise ValueError(f"Missing 'video.id' in {context}.")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Required service methods
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_titles(self) -> Union[Movies, Series]:
|
|
match = re.match(self.TITLE_RE, self.title)
|
|
if not match:
|
|
raise ValueError(f"Invalid title URL format: {self.title}")
|
|
|
|
kind = match.group("type")
|
|
content_id = match.group("content_id")
|
|
|
|
if not kind or not content_id:
|
|
raise ValueError(f"Invalid title URL: {self.title}. 'kind' or 'content_id' is missing.")
|
|
|
|
if kind == "filme":
|
|
return self._get_movie_titles(content_id)
|
|
elif kind == "serien":
|
|
return self._get_series_titles(content_id)
|
|
|
|
raise ValueError(f"Unsupported content type: '{kind}'.")
|
|
|
|
def _get_movie_titles(self, content_id: str) -> Movies:
|
|
"""Fetch and return movie title data."""
|
|
path = f'/filme/{content_id}'
|
|
response_data = self._execute_graphql_query(
|
|
'PageMovieDetailNewStatic', {'path': path},
|
|
'7b49493138f2162be230fd0e3fbf5722b1db6700a8842109ed3d98979898707a',
|
|
)
|
|
|
|
movie_data = response_data.get('page', {}).get('movie')
|
|
if not movie_data:
|
|
raise ValueError("Failed to fetch movie data from Joyn service.")
|
|
|
|
self._validate_required_fields(movie_data, ['id', 'title', 'productionYear'], 'movie_data')
|
|
self._validate_video_field(movie_data, 'movie_data')
|
|
|
|
if self._is_age_restricted(movie_data, movie_data['title']):
|
|
return Movies([])
|
|
|
|
return Movies([
|
|
Movie(
|
|
id_=movie_data['id'],
|
|
service=self.__class__,
|
|
name=movie_data['title'],
|
|
data=movie_data,
|
|
year=movie_data['productionYear'],
|
|
)
|
|
])
|
|
|
|
def _get_series_titles(self, content_id: str) -> Series:
|
|
"""Fetch and return series/episode title data."""
|
|
path = f'/serien/{content_id}'
|
|
parts = content_id.split("/")
|
|
|
|
if len(parts) == 1:
|
|
return self._get_full_series(path)
|
|
elif len(parts) == 2:
|
|
return self._get_single_episode(path)
|
|
|
|
raise ValueError(f"Unexpected series URL depth: '{content_id}'.")
|
|
|
|
def _get_full_series(self, path: str) -> Series:
|
|
"""Fetch all episodes across all seasons of a series."""
|
|
response_data = self._execute_graphql_query(
|
|
'SeriesDetailPageStatic', {'path': path},
|
|
'43cad327eeae12e14dfb629d662ebc947d78b71ec91d972ea1ef46ccdb29eede',
|
|
)
|
|
|
|
series_data = response_data.get('page', {}).get('series')
|
|
if not series_data:
|
|
raise ValueError("Failed to fetch series data from Joyn service.")
|
|
|
|
self._validate_required_fields(series_data, ['title', 'allSeasons'], 'series_data')
|
|
|
|
episodes = []
|
|
for season in series_data['allSeasons']:
|
|
self._validate_required_fields(season, ['id', 'number'], 'season')
|
|
|
|
season_response = self._execute_graphql_query(
|
|
'Season', {'id': season['id']},
|
|
'ee2396bb1b7c9f800e5cefd0b341271b7213fceb4ebe18d5a30dab41d703009f',
|
|
)
|
|
|
|
season_data = season_response.get('season')
|
|
if not season_data:
|
|
continue
|
|
|
|
self._validate_required_fields(season_data, ['episodes'], 'season_data')
|
|
|
|
for episode in season_data['episodes']:
|
|
self._validate_required_fields(episode, ['id', 'title', 'number'], 'episode')
|
|
self._validate_video_field(episode, 'episode')
|
|
|
|
if self._is_age_restricted(episode, episode['title']):
|
|
continue
|
|
|
|
episodes.append(Episode(
|
|
id_=episode['id'],
|
|
service=self.__class__,
|
|
title=series_data['title'],
|
|
season=season['number'],
|
|
number=episode['number'],
|
|
name=episode['title'],
|
|
data=episode,
|
|
))
|
|
|
|
return Series(episodes)
|
|
|
|
def _get_single_episode(self, path: str) -> Series:
|
|
"""Fetch a single episode by its direct URL."""
|
|
response_data = self._execute_graphql_query(
|
|
'EpisodeDetailPageStatic', {'path': path},
|
|
'c4bcacee94d38133e87879dad8d69bd8a74c7326262a1848cceb964b871c1551',
|
|
)
|
|
|
|
episode_data = response_data.get('page', {}).get('episode')
|
|
if not episode_data:
|
|
raise ValueError("Failed to fetch episode data from Joyn service.")
|
|
|
|
self._validate_required_fields(episode_data, ['id', 'title', 'number'], 'episode_data')
|
|
self._validate_required_fields(episode_data.get('season', {}), ['number'], 'episode_data.season')
|
|
self._validate_required_fields(episode_data.get('series', {}), ['title'], 'episode_data.series')
|
|
self._validate_video_field(episode_data, 'episode_data')
|
|
|
|
if self._is_age_restricted(episode_data, episode_data['title']):
|
|
return Series([])
|
|
|
|
return Series([
|
|
Episode(
|
|
id_=episode_data['id'],
|
|
service=self.__class__,
|
|
title=episode_data['series']['title'],
|
|
season=episode_data['season']['number'],
|
|
number=episode_data['number'],
|
|
name=episode_data['title'],
|
|
data=episode_data,
|
|
)
|
|
])
|
|
|
|
|
|
def get_tracks(self, title: Union[Episode, Movie]) -> Tracks:
|
|
if not isinstance(title, (Episode, Movie)):
|
|
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
|
|
|
|
content_id = title.data['video']['id']
|
|
|
|
# Step 1: Obtain entitlement token
|
|
entitlement_data = self._fetch_entitlement(content_id)
|
|
entitlement_token = entitlement_data['entitlement_token']
|
|
|
|
# Step 2: Obtain playlist/manifest
|
|
playlist_data = self._fetch_playlist(content_id, entitlement_token)
|
|
manifest_url = playlist_data['manifestUrl']
|
|
license_url = playlist_data.get('licenseUrl')
|
|
|
|
# Step 3: Parse DASH manifest into tracks
|
|
all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
|
|
|
|
for track in all_tracks:
|
|
if track.data is None:
|
|
track.data = {}
|
|
track.data['license_url'] = license_url
|
|
|
|
return Tracks(all_tracks)
|
|
|
|
def _fetch_entitlement(self, content_id: str) -> dict:
|
|
"""Request an entitlement token for the given content."""
|
|
response = self.session.post(
|
|
self.config["endpoints"]["entitlement"],
|
|
headers={
|
|
'Authorization': f'Bearer {self._joyn_auth_jwt}',
|
|
},
|
|
json={'content_id': content_id, 'content_type': 'VOD'},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if 'entitlement_token' not in data:
|
|
raise EnvironmentError("Entitlement response missing 'entitlement_token'.")
|
|
return data
|
|
|
|
def _fetch_playlist(self, content_id: str, entitlement_token: str) -> dict:
|
|
"""Request the playout/manifest URL for the given content."""
|
|
response = self.session.post(
|
|
self.config["endpoints"]["playout"].format(content_id=content_id),
|
|
headers={
|
|
'Authorization': f'Bearer {entitlement_token}',
|
|
},
|
|
json={
|
|
'manufacturer': 'unknown',
|
|
'platform': 'browser',
|
|
'maxSecurityLevel': 1,
|
|
'streamingFormat': 'dash',
|
|
'model': 'unknown',
|
|
'protectionSystem': 'widevine',
|
|
'enableDolbyAudio': False,
|
|
'enableSubtitles': True,
|
|
'maxResolution': 1080,
|
|
'variantName': 'default',
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if 'manifestUrl' not in data:
|
|
raise EnvironmentError("Playlist response missing 'manifestUrl'.")
|
|
return data
|
|
|
|
def get_chapters(self, title: Union[Movies, Series]) -> list[Chapter]:
|
|
# technically optional, but you must define and at least `return []`.
|
|
return Chapters()
|
|
|
|
def get_widevine_service_certificate(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Union[bytes, str]:
|
|
return None
|
|
|
|
def get_widevine_license(
|
|
self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack,
|
|
) -> Optional[Union[bytes, str]]:
|
|
"""Obtain a Widevine license for the given track."""
|
|
license_url = getattr(track, 'data', {}).get('license_url') if track.data else None
|
|
if not license_url:
|
|
raise ValueError("No license_url in track.data.")
|
|
|
|
response = self.session.post(
|
|
license_url,
|
|
headers={
|
|
'Content-Type': 'application/octet-stream',
|
|
'x-auth-token': self._joyn_auth_jwt,
|
|
},
|
|
data=challenge,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
self.log.info("Successfully fetched Widevine license.")
|
|
return response.content
|
|
|
|
def _execute_graphql_query(self, operation_name: str, variables: dict, persisted_query_hash: str) -> dict:
|
|
response = self.session.get(
|
|
self.config["endpoints"]["graphql_url"],
|
|
headers={
|
|
'X-Api-Key': self.config["client"]["api_key"]
|
|
},
|
|
params={
|
|
'operationName': operation_name,
|
|
'variables': json.dumps(variables).encode(),
|
|
'extensions': json.dumps({
|
|
'persistedQuery': {'version': 1, 'sha256Hash': persisted_query_hash}
|
|
}).encode()
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
|
|
response_data = response.json()
|
|
|
|
if 'data' not in response_data:
|
|
self.log.error(f"GraphQL response for '{operation_name}' missing 'data' field.")
|
|
raise ValueError(f"Invalid GraphQL response for '{operation_name}'.")
|
|
|
|
return response_data['data'] |