From f6794463f939b887f693a1501624d52cbecb9a23 Mon Sep 17 00:00:00 2001 From: blackicedbear Date: Tue, 14 Apr 2026 17:38:29 +0200 Subject: [PATCH] Add AMZN (Amazon Prime Video) service --- AMZN/__init__.py | 595 +++++++++++++++++++++++++++++++++++++++++++++++ AMZN/config.yaml | 12 + 2 files changed, 607 insertions(+) create mode 100644 AMZN/__init__.py create mode 100644 AMZN/config.yaml diff --git a/AMZN/__init__.py b/AMZN/__init__.py new file mode 100644 index 0000000..3d7105c --- /dev/null +++ b/AMZN/__init__.py @@ -0,0 +1,595 @@ +from __future__ import annotations + +import base64 +import json +import re +from http.cookiejar import MozillaCookieJar +from typing import Any + +import click +import requests + +from devine.core.constants import AnyTrack +from devine.core.credential import Credential +from devine.core.manifests import DASH +from devine.core.service import Service +from devine.core.titles import Episode, Movie, Movies, Series +from devine.core.tracks import Chapter, Chapters, Subtitle, Tracks + + +class AMZN(Service): + """Amazon Prime Video streaming service.""" + + # List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase. + ALIASES = () + + # List of regions of which the service offers support for. + GEOFENCE = ("de", "at", "ch") + + TITLE_RE = r"^https?:\/\/(?:www\.)?primevideo\.com\/(?:-\/[a-zA-Z-]{2,5}\/)?detail\/(?P[A-Z0-9]+)(?:[\/\?].*)?$" + + @staticmethod + @click.command(name="AMZN", short_help="https://primevideo.com", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: click.Context, **kwargs: Any) -> AMZN: + return AMZN(ctx, **kwargs) + + def __init__(self, ctx: click.Context, title: str) -> None: + self.title = title + self._title_id: str | None = None + + super().__init__(ctx) + + # ------------------------------------------------------------------ + # Session / Authentication + # ------------------------------------------------------------------ + + def get_session(self) -> requests.Session: + session = super().get_session() + session.headers.update({ + "Accept": "application/json", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "same-site", + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0" + ), + }) + return session + + def authenticate( + self, + cookies: MozillaCookieJar | None = None, + credential: Credential | None = None, + ) -> None: + super().authenticate(cookies, credential) + + if not cookies: + raise ValueError( + "Authentication for Amazon Prime Video requires cookies. " + "Please provide a MozillaCookieJar with the necessary cookies." + ) + + self.device_id = self._get_device_id() + self.customer_config = self._get_customer_config() + + # ------------------------------------------------------------------ + # Device / Customer config helpers + # ------------------------------------------------------------------ + + def _get_device_id(self) -> str: + """Extract the DeviceId from the av-native-app cookie. + + Cookie format: ``AppName=PVPWA-MICROSOFT_STORE|DeviceId=`` + """ + for cookie in self.session.cookies: + if cookie.name == "av-native-app": + parts = dict(pair.split("=", 1) for pair in cookie.value.split("|") if "=" in pair) + device_id = parts.get("DeviceId") + if device_id: + return device_id + + raise ValueError("Device ID not found in cookies.") + + def _get_customer_config(self) -> dict: + """Fetch and return the customer config section from Amazon's startup config.""" + data = self._fetch_customer_config() + customer_config = data.get("customerConfig", {}) + if not customer_config: + raise ValueError("Customer config not found in response.") + return customer_config + + # ------------------------------------------------------------------ + # Title retrieval + # ------------------------------------------------------------------ + + def get_titles(self) -> Movies | Series: + """Fetch title metadata and return a Movies or Series object.""" + match = re.match(self.TITLE_RE, self.title) + if not match: + raise ValueError(f"Invalid title URL format: {self.title}") + + content_id = match.group("id") + if not content_id: + raise ValueError(f"Invalid title URL: {self.title}.") + + self._title_id = content_id + + metadata = self._fetch_metadata(content_id) + if not metadata: + raise ValueError(f"Failed to fetch metadata for content ID: {content_id}.") + + page_metadata = metadata.get("head", {}).get("pageMetadata", {}) + sub_page_type = page_metadata.get("subPageType") + + if sub_page_type == "Movie": + return self._parse_movie(metadata) + elif sub_page_type == "Season": + return self._parse_series(metadata) + else: + raise ValueError(f"Unsupported content type: {sub_page_type}") + + def _parse_movie(self, metadata: dict) -> Movies: + """Parse movie metadata into a Movies object.""" + atf_state = metadata.get("body", {}).get("atf", {}).get("state", {}) + page_title_id = atf_state.get("pageTitleId", "") + header_detail = atf_state.get("detail", {}).get("headerDetail", {}) + self_info = atf_state.get("self", {}) + + movie_info = header_detail.get(page_title_id, {}) + movie_self = self_info.get(page_title_id, {}) + + movie_name = movie_info.get("title", "Unknown Movie") + release_year = movie_info.get("releaseYear") + detail_id = movie_self.get("compactGTI", "") + + # Try to extract playbackEnvelope from the PLAY primary action + playback_envelope: str | None = None + actions = atf_state.get("action", {}).get("atf", {}).get(page_title_id, {}) + for action in actions.get("primaryActions", []): + if action.get("actionType") == "PLAY": + playback_envelope = ( + action.get("payload", {}).get("playback", {}).get("playbackEnvelope") + ) + break + + return Movies([ + Movie( + id_=page_title_id, + service=AMZN, + name=movie_name, + data={ + "pageTitleId": page_title_id, + "detailId": detail_id, + "gti": page_title_id, + "playback_envelope": playback_envelope, + }, + year=release_year, + ) + ]) + + def _parse_series(self, metadata: dict) -> Series: + """Parse series metadata and recursively fetch all episodes.""" + atf_state = metadata.get("body", {}).get("atf", {}).get("state", {}) + header_detail = atf_state.get("detail", {}).get("headerDetail", {}) + + page_title_id = atf_state.get("pageTitleId", "") + header_info = header_detail.get(page_title_id, {}) + series_name = header_info.get("parentTitle", "Unknown Series") + + season_list = atf_state.get("seasons", {}).get(page_title_id, []) + + all_episodes: list[Episode] = [] + + if season_list: + for season in season_list: + season_id = season.get("seasonId", "") + season_number = season.get("sequenceNumber", 1) + season_metadata = self._fetch_metadata(season_id) + if season_metadata: + all_episodes.extend( + self._parse_season_episodes(season_metadata, series_name, season_number) + ) + else: + # No seasons — try to parse episodes directly from the current page + all_episodes.extend(self._parse_season_episodes(metadata, series_name, 1)) + + if not all_episodes: + raise ValueError(f"No episodes found for series: {series_name}") + + return Series(all_episodes) + + def _parse_season_episodes( + self, metadata: dict, series_name: str, season_number: int + ) -> list[Episode]: + """Parse episodes from a season's metadata.""" + atf_state = metadata.get("body", {}).get("atf", {}).get("state", {}) + btf_state = metadata.get("body", {}).get("btf", {}).get("state", {}) + + all_episodes: dict[str, Episode] = {} + + # BTF episodes (primary source) + episodes_detail_btf = btf_state.get("detail", {}).get("detail", {}) + actions_btf = btf_state.get("action", {}).get("btf", {}) + for gti, info in episodes_detail_btf.items(): + if info.get("titleType") == "episode": + episode = self._build_episode(gti, info, actions_btf.get(gti, {}), series_name, season_number) + if episode: + all_episodes[gti] = episode + + # ATF episodes (fallback / supplement) + episodes_detail_atf = atf_state.get("detail", {}).get("detail", {}) + actions_atf = atf_state.get("action", {}).get("atf", {}) + for gti, info in episodes_detail_atf.items(): + if info.get("titleType") == "episode" and gti not in all_episodes: + episode = self._build_episode(gti, info, actions_atf.get(gti, {}), series_name, season_number) + if episode: + all_episodes[gti] = episode + + return list(all_episodes.values()) + + def _build_episode( + self, + gti: str, + info: dict, + actions: dict, + series_name: str, + season_number: int, + ) -> Episode | None: + """Build an Episode object from metadata.""" + episode_title = info.get("title", "Unknown Episode") + episode_number = info.get("episodeNumber") + + # Locate playbackEnvelope inside the PLAY primary action + playback_envelope: str | None = None + for action in actions.get("primaryActions", []): + if action.get("actionType") == "PLAY": + playback_envelope = ( + action.get("payload", {}).get("playback", {}).get("playbackEnvelope") + ) + break + + return Episode( + id_=gti, + service=AMZN, + name=episode_title, + season=season_number, + number=episode_number, + title=series_name, + data={"gti": gti, "playback_envelope": playback_envelope}, + ) + + # ------------------------------------------------------------------ + # Track retrieval + # ------------------------------------------------------------------ + + def get_tracks(self, title: Episode | Movie) -> Tracks: + if not isinstance(title, (Episode, Movie)): + raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.") + + content_id = title.data.get("gti") + if not content_id: + raise ValueError("No content ID found in title data.") + + playback_envelope = title.data.get("playback_envelope") + if not playback_envelope: + raise ValueError("No playback_envelope found in title.data.") + + playback_data = self._fetch_playback_sources(playback_envelope, content_id) + if not playback_data: + raise ValueError(f"Failed to fetch playback sources for title ID: {content_id}.") + + widevine_cert = self._extract_widevine_service_certificate(playback_data) + session_handoff_token = self._extract_session_handoff_token(playback_data) + + manifest_url = self._extract_manifest_url(playback_data) + if not manifest_url: + raise ValueError("No manifest URL found in playback sources.") + + all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de") + + # Attach DRM metadata to every track in a single pass + for track in all_tracks: + track.data["session_handoff_token"] = session_handoff_token + if widevine_cert: + track.data["widevine_service_certificate"] = widevine_cert + + subtitle_tracks = self._extract_subtitle_tracks(playback_data) + + return Tracks(all_tracks + subtitle_tracks) + + def get_chapters(self, title: Movies | Series) -> Chapters: + return Chapters() + + # ------------------------------------------------------------------ + # Widevine DRM + # ------------------------------------------------------------------ + + def get_widevine_service_certificate( + self, *, challenge: bytes, title: Movies | Series, track: AnyTrack + ) -> bytes | None: + """Return the pre-fetched Widevine service certificate stored on the track.""" + return track.data.get("widevine_service_certificate") + + def get_widevine_license( + self, + *, + challenge: bytes, + title: Movies | Series, + track: AnyTrack, + ) -> bytes | None: + """Obtain a Widevine license for the given track.""" + licence_endpoint = self.config["endpoints"]["licence_endpoint"] + device_type_id = self.config["devine"]["deviceTypeID"] + title_id = title.data.get("gti", "") + + playback_envelope = title.data.get("playback_envelope") + if not playback_envelope: + raise ValueError("No playback envelope found in title data.") + + session_handoff_token = track.data.get("session_handoff_token") + if not session_handoff_token: + raise ValueError("No session handoff token found in track data.") + + params = { + "deviceID": self.device_id, + "deviceTypeID": device_type_id, + "marketplaceID": self.customer_config.get("marketplaceID", ""), + "titleId": title_id, + } + + json_data = { + "includeHdcpTestKey": True, + "playbackEnvelope": playback_envelope, + "sessionHandoffToken": session_handoff_token, + "licenseChallenge": base64.b64encode(challenge).decode("utf-8"), + } + + response = self.session.post( + licence_endpoint, + headers={**self.session.headers, "Content-Type": "application/json"}, + params=params, + json=json_data, + ) + response.raise_for_status() + + self.log.info("Successfully fetched Widevine license.") + + license_data = response.json() + encoded_license = license_data.get("widevineLicense", {}).get("license") + if encoded_license: + return base64.b64decode(encoded_license) + + return None + + # ------------------------------------------------------------------ + # Playback data extraction helpers + # ------------------------------------------------------------------ + + def _extract_manifest_url(self, playback_data: dict) -> str | None: + """Extract the DASH manifest URL from playback sources data.""" + intra_title = ( + playback_data + .get("vodPlaylistedPlaybackUrls", {}) + .get("result", {}) + .get("playbackUrls", {}) + .get("intraTitlePlaylist", []) + ) + + if not intra_title: + return None + + # Prefer the section explicitly marked as "Main" + for section in intra_title: + if section.get("type") == "Main": + urls = section.get("urls", []) + if urls: + return urls[0].get("url") + + # Fallback: first available URL (older manifests may lack a "type" key) + for section in intra_title: + urls = section.get("urls", []) + if urls: + return urls[0].get("url") + + return None + + def _extract_subtitle_tracks(self, playback_data: dict) -> list[Subtitle]: + """Extract subtitle tracks from playback sources data.""" + timed_text = playback_data.get("timedTextUrls", {}).get("result", {}) + subtitle_tracks: list[Subtitle] = [] + + for subtitle in timed_text.get("subtitleUrls", []): + language = subtitle.get("languageCode", "") + url = subtitle.get("url", "") + fmt = subtitle.get("format", "") + sub_type = subtitle.get("type", "") + + if fmt == "TTMLv2": + fmt = "ttml" + + if language and url: + subtitle_tracks.append(Subtitle( + id_=f"{language}_{sub_type}_{fmt}", + url=url, + codec=Subtitle.Codec.from_mime(fmt), + language=language, + is_original_lang=False, + forced=False, + sdh=True, + )) + + return subtitle_tracks + + def _extract_widevine_service_certificate(self, playback_data: dict) -> bytes | None: + """Decode and return the Widevine service certificate from playback data. + + Expected structure:: + + { + "widevineServiceCertificate": { + "result": { + "encodedServiceCertificate": "" + } + } + } + """ + encoded_cert = ( + playback_data + .get("widevineServiceCertificate", {}) + .get("result", {}) + .get("encodedServiceCertificate") + ) + + if encoded_cert: + self.log.info("Successfully extracted Widevine service certificate.") + return base64.b64decode(encoded_cert) + + self.log.warning("No Widevine service certificate found in playback data.") + return None + + def _extract_session_handoff_token(self, playback_data: dict) -> str: + """Extract the session handoff token from playback data. + + Expected structure:: + + { + "sessionization": { + "sessionHandoffToken": "" + } + } + """ + token = playback_data.get("sessionization", {}).get("sessionHandoffToken") + if token: + return token + raise ValueError("No session handoff token found in playback data.") + + # ------------------------------------------------------------------ + # Network fetch helpers + # ------------------------------------------------------------------ + + def _fetch_metadata(self, content_id: str) -> dict | None: + """Fetch JSON metadata for a given content ID from the detail page.""" + url = self.config["endpoints"]["metadata_endpoint"].format(id=content_id) + response = self.session.get(url) + response.raise_for_status() + return response.json() + + def _fetch_enriched_metadata(self, content_ids: list[str]) -> dict | None: + """Fetch enriched hover/playback metadata for a list of content IDs. + + This method is available for future use (e.g. pre-fetching playback + envelopes in bulk) but is not called by the main ingest flow. + """ + url = self.config["endpoints"]["enriched_metadata_endpoint"] + data = { + "metadataToEnrich": json.dumps({ + "placement": "HOVER", + "playback": True, + "preroll": True, + "trailer": True, + "watchlist": True, + }), + "titleIDsToEnrich": json.dumps(content_ids), + "journeyIngressContext": "", + "currentUrl": "https://www.primevideo.com/", + } + response = self.session.post( + url, + headers={**self.session.headers, "Content-Type": "application/x-www-form-urlencoded"}, + data=data, + ) + response.raise_for_status() + return response.json() + + def _fetch_playback_sources(self, playback_envelope: str, title_id: str) -> dict | None: + """Fetch all playback resources (manifest, DRM, subtitles) for a title.""" + url = self.config["endpoints"]["playback_sources_endpoint"] + device_type_id = self.config["devine"]["deviceTypeID"] + + params = { + "deviceID": self.device_id, + "deviceTypeID": device_type_id, + "marketplaceID": self.customer_config.get("marketplaceID", ""), + "titleId": title_id, + } + + json_data = { + "globalParameters": { + "deviceCapabilityFamily": "WebPlayer", + "playbackEnvelope": playback_envelope, + "capabilityDiscriminators": { + "operatingSystem": {"name": "Windows", "version": "Unknown"}, + "deviceModel": {"name": "Unknown", "version": "UNKNOWN"}, + "nativeApplication": {"name": "PVPWA", "version": "Unknown"}, + "hfrControlMode": "Legacy", + "displayResolution": {"height": 1440, "width": 2560}, + }, + }, + "timedTextUrlsRequest": { + "supportedTimedTextFormats": ["TTMLv2", "DFXP"], + }, + "vodPlaylistedPlaybackUrlsRequest": { + "device": { + "hdcpLevel": "1.4", + "maxVideoResolution": "1080p", + "supportedStreamingTechnologies": ["DASH"], + "streamingTechnologies": { + "DASH": { + "bitrateAdaptations": ["CBR", "CVBR"], + "codecs": ["H264"], + "drmKeyScheme": "DualKey", + "drmType": "Widevine", + "dynamicRangeFormats": ["None"], + "edgeDeliveryAuthorizationSchemes": ["PVExchangeV1", "Transparent"], + "fragmentRepresentations": ["ByteOffsetRange", "SeparateFile"], + "frameRates": ["Standard", "High"], + "stitchType": "MultiPeriod", + "segmentInfoType": "Base", + "timedTextRepresentations": [ + "NotInManifestNorStream", + "SeparateStreamInManifest", + ], + "trickplayRepresentations": ["NotInManifestNorStream"], + "variableAspectRatio": "supported", + } + }, + "displayWidth": 2560, + "displayHeight": 1440, + }, + "playbackSettingsRequest": { + "deviceModel": "Unknown", + "firmware": "UNKNOWN", + "playerType": "xp", + "responseFormatVersion": "1.0.0", + "titleId": title_id, + }, + }, + "vodXrayMetadataRequest": { + "xrayDeviceClass": "normal", + "xrayPlaybackMode": "playback", + "xrayToken": "XRAY_WEB_2023_V2", + }, + } + + response = self.session.post(url, params=params, json=json_data) + response.raise_for_status() + return response.json() + + def _fetch_customer_config(self) -> dict: + """Fetch the Amazon startup/customer config for this device.""" + url = self.config["endpoints"]["customer_config_endpoint"] + device_type_id = self.config["devine"]["deviceTypeID"] + + params = { + "deviceID": self.device_id, + "deviceTypeID": device_type_id, + "format": "json", + } + + response = self.session.get(url, params=params) + response.raise_for_status() + return response.json() \ No newline at end of file diff --git a/AMZN/config.yaml b/AMZN/config.yaml new file mode 100644 index 0000000..db19a55 --- /dev/null +++ b/AMZN/config.yaml @@ -0,0 +1,12 @@ +# This config file is automatically loaded into `self.config` class instance variable. + +endpoints: + metadata_endpoint: 'https://www.primevideo.com/detail/{id}' + enriched_metadata_endpoint: 'https://www.primevideo.com/api/enrichItemMetadata' + playback_sources_endpoint: 'https://atv-ps-eu.primevideo.com/playback/prs/GetVodPlaybackResources' + licence_endpoint: 'https://atv-ps-eu.primevideo.com/playback/drm-vod/GetWidevineLicense' + customer_config_endpoint: 'https://atv-ps-eu.primevideo.com/cdp/usage/GetAppStartupConfig' + +devine: + deviceTypeID: 'A3OEMT9S4WNMK' + deviceID: 'WEB' \ No newline at end of file