from __future__ import annotations import base64 import json import re from http.cookiejar import MozillaCookieJar from typing import Any import click import requests from devine.core.constants import AnyTrack from devine.core.credential import Credential from devine.core.manifests import DASH from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series from devine.core.tracks import Chapter, Chapters, Subtitle, Tracks class AMZN(Service): """Amazon Prime Video streaming service.""" # List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase. ALIASES = () # List of regions of which the service offers support for. GEOFENCE = ("de", "at", "ch") TITLE_RE = r"^https?:\/\/(?:www\.)?primevideo\.com\/(?:-\/[a-zA-Z-]{2,5}\/)?detail\/(?P[A-Z0-9]+)(?:[\/\?].*)?$" @staticmethod @click.command(name="AMZN", short_help="https://primevideo.com", help=__doc__) @click.argument("title", type=str) @click.pass_context def cli(ctx: click.Context, **kwargs: Any) -> AMZN: return AMZN(ctx, **kwargs) def __init__(self, ctx: click.Context, title: str) -> None: self.title = title self._title_id: str | None = None super().__init__(ctx) # ------------------------------------------------------------------ # Session / Authentication # ------------------------------------------------------------------ def get_session(self) -> requests.Session: session = super().get_session() session.headers.update({ "Accept": "application/json", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-site", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0" ), }) return session def authenticate( self, cookies: MozillaCookieJar | None = None, credential: Credential | None = None, ) -> None: super().authenticate(cookies, credential) if not cookies: raise ValueError( "Authentication for Amazon Prime Video requires cookies. " "Please provide a MozillaCookieJar with the necessary cookies." ) self.device_id = self._get_device_id() self.customer_config = self._get_customer_config() # ------------------------------------------------------------------ # Device / Customer config helpers # ------------------------------------------------------------------ def _get_device_id(self) -> str: """Extract the DeviceId from the av-native-app cookie. Cookie format: ``AppName=PVPWA-MICROSOFT_STORE|DeviceId=`` """ for cookie in self.session.cookies: if cookie.name == "av-native-app": parts = dict(pair.split("=", 1) for pair in cookie.value.split("|") if "=" in pair) device_id = parts.get("DeviceId") if device_id: return device_id raise ValueError("Device ID not found in cookies.") def _get_customer_config(self) -> dict: """Fetch and return the customer config section from Amazon's startup config.""" data = self._fetch_customer_config() customer_config = data.get("customerConfig", {}) if not customer_config: raise ValueError("Customer config not found in response.") return customer_config # ------------------------------------------------------------------ # Title retrieval # ------------------------------------------------------------------ def get_titles(self) -> Movies | Series: """Fetch title metadata and return a Movies or Series object.""" match = re.match(self.TITLE_RE, self.title) if not match: raise ValueError(f"Invalid title URL format: {self.title}") content_id = match.group("id") if not content_id: raise ValueError(f"Invalid title URL: {self.title}.") self._title_id = content_id metadata = self._fetch_metadata(content_id) if not metadata: raise ValueError(f"Failed to fetch metadata for content ID: {content_id}.") page_metadata = metadata.get("head", {}).get("pageMetadata", {}) sub_page_type = page_metadata.get("subPageType") if sub_page_type == "Movie": return self._parse_movie(metadata) elif sub_page_type == "Season": return self._parse_series(metadata) else: raise ValueError(f"Unsupported content type: {sub_page_type}") def _parse_movie(self, metadata: dict) -> Movies: """Parse movie metadata into a Movies object.""" atf_state = metadata.get("body", {}).get("atf", {}).get("state", {}) page_title_id = atf_state.get("pageTitleId", "") header_detail = atf_state.get("detail", {}).get("headerDetail", {}) self_info = atf_state.get("self", {}) movie_info = header_detail.get(page_title_id, {}) movie_self = self_info.get(page_title_id, {}) movie_name = movie_info.get("title", "Unknown Movie") release_year = movie_info.get("releaseYear") detail_id = movie_self.get("compactGTI", "") # Try to extract playbackEnvelope from the PLAY primary action playback_envelope: str | None = None actions = atf_state.get("action", {}).get("atf", {}).get(page_title_id, {}) for action in actions.get("primaryActions", []): if action.get("actionType") == "PLAY": playback_envelope = ( action.get("payload", {}).get("playback", {}).get("playbackEnvelope") ) break return Movies([ Movie( id_=page_title_id, service=AMZN, name=movie_name, data={ "pageTitleId": page_title_id, "detailId": detail_id, "gti": page_title_id, "playback_envelope": playback_envelope, }, year=release_year, ) ]) def _parse_series(self, metadata: dict) -> Series: """Parse series metadata and recursively fetch all episodes.""" atf_state = metadata.get("body", {}).get("atf", {}).get("state", {}) header_detail = atf_state.get("detail", {}).get("headerDetail", {}) page_title_id = atf_state.get("pageTitleId", "") header_info = header_detail.get(page_title_id, {}) series_name = header_info.get("parentTitle", "Unknown Series") season_list = atf_state.get("seasons", {}).get(page_title_id, []) all_episodes: list[Episode] = [] if season_list: for season in season_list: season_id = season.get("seasonId", "") season_number = season.get("sequenceNumber", 1) season_metadata = self._fetch_metadata(season_id) if season_metadata: all_episodes.extend( self._parse_season_episodes(season_metadata, series_name, season_number) ) else: # No seasons — try to parse episodes directly from the current page all_episodes.extend(self._parse_season_episodes(metadata, series_name, 1)) if not all_episodes: raise ValueError(f"No episodes found for series: {series_name}") return Series(all_episodes) def _parse_season_episodes( self, metadata: dict, series_name: str, season_number: int ) -> list[Episode]: """Parse episodes from a season's metadata.""" atf_state = metadata.get("body", {}).get("atf", {}).get("state", {}) btf_state = metadata.get("body", {}).get("btf", {}).get("state", {}) all_episodes: dict[str, Episode] = {} # BTF episodes (primary source) episodes_detail_btf = btf_state.get("detail", {}).get("detail", {}) actions_btf = btf_state.get("action", {}).get("btf", {}) for gti, info in episodes_detail_btf.items(): if info.get("titleType") == "episode": episode = self._build_episode(gti, info, actions_btf.get(gti, {}), series_name, season_number) if episode: all_episodes[gti] = episode # ATF episodes (fallback / supplement) episodes_detail_atf = atf_state.get("detail", {}).get("detail", {}) actions_atf = atf_state.get("action", {}).get("atf", {}) for gti, info in episodes_detail_atf.items(): if info.get("titleType") == "episode" and gti not in all_episodes: episode = self._build_episode(gti, info, actions_atf.get(gti, {}), series_name, season_number) if episode: all_episodes[gti] = episode return list(all_episodes.values()) def _build_episode( self, gti: str, info: dict, actions: dict, series_name: str, season_number: int, ) -> Episode | None: """Build an Episode object from metadata.""" episode_title = info.get("title", "Unknown Episode") episode_number = info.get("episodeNumber") # Locate playbackEnvelope inside the PLAY primary action playback_envelope: str | None = None for action in actions.get("primaryActions", []): if action.get("actionType") == "PLAY": playback_envelope = ( action.get("payload", {}).get("playback", {}).get("playbackEnvelope") ) break return Episode( id_=gti, service=AMZN, name=episode_title, season=season_number, number=episode_number, title=series_name, data={"gti": gti, "playback_envelope": playback_envelope}, ) # ------------------------------------------------------------------ # Track retrieval # ------------------------------------------------------------------ def get_tracks(self, title: Episode | Movie) -> Tracks: if not isinstance(title, (Episode, Movie)): raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.") content_id = title.data.get("gti") if not content_id: raise ValueError("No content ID found in title data.") playback_envelope = title.data.get("playback_envelope") if not playback_envelope: raise ValueError("No playback_envelope found in title.data.") playback_data = self._fetch_playback_sources(playback_envelope, content_id) if not playback_data: raise ValueError(f"Failed to fetch playback sources for title ID: {content_id}.") widevine_cert = self._extract_widevine_service_certificate(playback_data) session_handoff_token = self._extract_session_handoff_token(playback_data) manifest_url = self._extract_manifest_url(playback_data) if not manifest_url: raise ValueError("No manifest URL found in playback sources.") all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de") # Attach DRM metadata to every track in a single pass for track in all_tracks: track.data["session_handoff_token"] = session_handoff_token if widevine_cert: track.data["widevine_service_certificate"] = widevine_cert subtitle_tracks = self._extract_subtitle_tracks(playback_data) return Tracks(all_tracks + subtitle_tracks) def get_chapters(self, title: Movies | Series) -> Chapters: return Chapters() # ------------------------------------------------------------------ # Widevine DRM # ------------------------------------------------------------------ def get_widevine_service_certificate( self, *, challenge: bytes, title: Movies | Series, track: AnyTrack ) -> bytes | None: """Return the pre-fetched Widevine service certificate stored on the track.""" return track.data.get("widevine_service_certificate") def get_widevine_license( self, *, challenge: bytes, title: Movies | Series, track: AnyTrack, ) -> bytes | None: """Obtain a Widevine license for the given track.""" licence_endpoint = self.config["endpoints"]["licence_endpoint"] device_type_id = self.config["devine"]["deviceTypeID"] title_id = title.data.get("gti", "") playback_envelope = title.data.get("playback_envelope") if not playback_envelope: raise ValueError("No playback envelope found in title data.") session_handoff_token = track.data.get("session_handoff_token") if not session_handoff_token: raise ValueError("No session handoff token found in track data.") params = { "deviceID": self.device_id, "deviceTypeID": device_type_id, "marketplaceID": self.customer_config.get("marketplaceID", ""), "titleId": title_id, } json_data = { "includeHdcpTestKey": True, "playbackEnvelope": playback_envelope, "sessionHandoffToken": session_handoff_token, "licenseChallenge": base64.b64encode(challenge).decode("utf-8"), } response = self.session.post( licence_endpoint, headers={**self.session.headers, "Content-Type": "application/json"}, params=params, json=json_data, ) response.raise_for_status() self.log.info("Successfully fetched Widevine license.") license_data = response.json() encoded_license = license_data.get("widevineLicense", {}).get("license") if encoded_license: return base64.b64decode(encoded_license) return None # ------------------------------------------------------------------ # Playback data extraction helpers # ------------------------------------------------------------------ def _extract_manifest_url(self, playback_data: dict) -> str | None: """Extract the DASH manifest URL from playback sources data.""" intra_title = ( playback_data .get("vodPlaylistedPlaybackUrls", {}) .get("result", {}) .get("playbackUrls", {}) .get("intraTitlePlaylist", []) ) if not intra_title: return None # Prefer the section explicitly marked as "Main" for section in intra_title: if section.get("type") == "Main": urls = section.get("urls", []) if urls: return urls[0].get("url") # Fallback: first available URL (older manifests may lack a "type" key) for section in intra_title: urls = section.get("urls", []) if urls: return urls[0].get("url") return None def _extract_subtitle_tracks(self, playback_data: dict) -> list[Subtitle]: """Extract subtitle tracks from playback sources data.""" timed_text = playback_data.get("timedTextUrls", {}).get("result", {}) subtitle_tracks: list[Subtitle] = [] for subtitle in timed_text.get("subtitleUrls", []): language = subtitle.get("languageCode", "") url = subtitle.get("url", "") fmt = subtitle.get("format", "") sub_type = subtitle.get("type", "") if fmt == "TTMLv2": fmt = "ttml" if language and url: subtitle_tracks.append(Subtitle( id_=f"{language}_{sub_type}_{fmt}", url=url, codec=Subtitle.Codec.from_mime(fmt), language=language, is_original_lang=False, forced=False, sdh=True, )) return subtitle_tracks def _extract_widevine_service_certificate(self, playback_data: dict) -> bytes | None: """Decode and return the Widevine service certificate from playback data. Expected structure:: { "widevineServiceCertificate": { "result": { "encodedServiceCertificate": "" } } } """ encoded_cert = ( playback_data .get("widevineServiceCertificate", {}) .get("result", {}) .get("encodedServiceCertificate") ) if encoded_cert: self.log.info("Successfully extracted Widevine service certificate.") return base64.b64decode(encoded_cert) self.log.warning("No Widevine service certificate found in playback data.") return None def _extract_session_handoff_token(self, playback_data: dict) -> str: """Extract the session handoff token from playback data. Expected structure:: { "sessionization": { "sessionHandoffToken": "" } } """ token = playback_data.get("sessionization", {}).get("sessionHandoffToken") if token: return token raise ValueError("No session handoff token found in playback data.") # ------------------------------------------------------------------ # Network fetch helpers # ------------------------------------------------------------------ def _fetch_metadata(self, content_id: str) -> dict | None: """Fetch JSON metadata for a given content ID from the detail page.""" url = self.config["endpoints"]["metadata_endpoint"].format(id=content_id) response = self.session.get(url) response.raise_for_status() return response.json() def _fetch_enriched_metadata(self, content_ids: list[str]) -> dict | None: """Fetch enriched hover/playback metadata for a list of content IDs. This method is available for future use (e.g. pre-fetching playback envelopes in bulk) but is not called by the main ingest flow. """ url = self.config["endpoints"]["enriched_metadata_endpoint"] data = { "metadataToEnrich": json.dumps({ "placement": "HOVER", "playback": True, "preroll": True, "trailer": True, "watchlist": True, }), "titleIDsToEnrich": json.dumps(content_ids), "journeyIngressContext": "", "currentUrl": "https://www.primevideo.com/", } response = self.session.post( url, headers={**self.session.headers, "Content-Type": "application/x-www-form-urlencoded"}, data=data, ) response.raise_for_status() return response.json() def _fetch_playback_sources(self, playback_envelope: str, title_id: str) -> dict | None: """Fetch all playback resources (manifest, DRM, subtitles) for a title.""" url = self.config["endpoints"]["playback_sources_endpoint"] device_type_id = self.config["devine"]["deviceTypeID"] params = { "deviceID": self.device_id, "deviceTypeID": device_type_id, "marketplaceID": self.customer_config.get("marketplaceID", ""), "titleId": title_id, } json_data = { "globalParameters": { "deviceCapabilityFamily": "WebPlayer", "playbackEnvelope": playback_envelope, "capabilityDiscriminators": { "operatingSystem": {"name": "Windows", "version": "Unknown"}, "deviceModel": {"name": "Unknown", "version": "UNKNOWN"}, "nativeApplication": {"name": "PVPWA", "version": "Unknown"}, "hfrControlMode": "Legacy", "displayResolution": {"height": 1440, "width": 2560}, }, }, "timedTextUrlsRequest": { "supportedTimedTextFormats": ["TTMLv2", "DFXP"], }, "vodPlaylistedPlaybackUrlsRequest": { "device": { "hdcpLevel": "1.4", "maxVideoResolution": "1080p", "supportedStreamingTechnologies": ["DASH"], "streamingTechnologies": { "DASH": { "bitrateAdaptations": ["CBR", "CVBR"], "codecs": ["H264"], "drmKeyScheme": "DualKey", "drmType": "Widevine", "dynamicRangeFormats": ["None"], "edgeDeliveryAuthorizationSchemes": ["PVExchangeV1", "Transparent"], "fragmentRepresentations": ["ByteOffsetRange", "SeparateFile"], "frameRates": ["Standard", "High"], "stitchType": "MultiPeriod", "segmentInfoType": "Base", "timedTextRepresentations": [ "NotInManifestNorStream", "SeparateStreamInManifest", ], "trickplayRepresentations": ["NotInManifestNorStream"], "variableAspectRatio": "supported", } }, "displayWidth": 2560, "displayHeight": 1440, }, "playbackSettingsRequest": { "deviceModel": "Unknown", "firmware": "UNKNOWN", "playerType": "xp", "responseFormatVersion": "1.0.0", "titleId": title_id, }, }, "vodXrayMetadataRequest": { "xrayDeviceClass": "normal", "xrayPlaybackMode": "playback", "xrayToken": "XRAY_WEB_2023_V2", }, } response = self.session.post(url, params=params, json=json_data) response.raise_for_status() return response.json() def _fetch_customer_config(self) -> dict: """Fetch the Amazon startup/customer config for this device.""" url = self.config["endpoints"]["customer_config_endpoint"] device_type_id = self.config["devine"]["deviceTypeID"] params = { "deviceID": self.device_id, "deviceTypeID": device_type_id, "format": "json", } response = self.session.get(url, params=params) response.raise_for_status() return response.json()