Files

595 lines
23 KiB
Python

from __future__ import annotations
import base64
import json
import re
from http.cookiejar import MozillaCookieJar
from typing import Any
import click
import requests
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Chapter, Chapters, Subtitle, Tracks
class AMZN(Service):
"""Amazon Prime Video streaming service."""
# List of Service Aliases. Do NOT include the Service Tag. All aliases must be lowercase.
ALIASES = ()
# List of regions of which the service offers support for.
GEOFENCE = ("de", "at", "ch")
TITLE_RE = r"^https?:\/\/(?:www\.)?primevideo\.com\/(?:-\/[a-zA-Z-]{2,5}\/)?detail\/(?P<id>[A-Z0-9]+)(?:[\/\?].*)?$"
@staticmethod
@click.command(name="AMZN", short_help="https://primevideo.com", help=__doc__)
@click.argument("title", type=str)
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> AMZN:
return AMZN(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str) -> None:
self.title = title
self._title_id: str | None = None
super().__init__(ctx)
# ------------------------------------------------------------------
# Session / Authentication
# ------------------------------------------------------------------
def get_session(self) -> requests.Session:
session = super().get_session()
session.headers.update({
"Accept": "application/json",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0"
),
})
return session
def authenticate(
self,
cookies: MozillaCookieJar | None = None,
credential: Credential | None = None,
) -> None:
super().authenticate(cookies, credential)
if not cookies:
raise ValueError(
"Authentication for Amazon Prime Video requires cookies. "
"Please provide a MozillaCookieJar with the necessary cookies."
)
self.device_id = self._get_device_id()
self.customer_config = self._get_customer_config()
# ------------------------------------------------------------------
# Device / Customer config helpers
# ------------------------------------------------------------------
def _get_device_id(self) -> str:
"""Extract the DeviceId from the av-native-app cookie.
Cookie format: ``AppName=PVPWA-MICROSOFT_STORE|DeviceId=<ID>``
"""
for cookie in self.session.cookies:
if cookie.name == "av-native-app":
parts = dict(pair.split("=", 1) for pair in cookie.value.split("|") if "=" in pair)
device_id = parts.get("DeviceId")
if device_id:
return device_id
raise ValueError("Device ID not found in cookies.")
def _get_customer_config(self) -> dict:
"""Fetch and return the customer config section from Amazon's startup config."""
data = self._fetch_customer_config()
customer_config = data.get("customerConfig", {})
if not customer_config:
raise ValueError("Customer config not found in response.")
return customer_config
# ------------------------------------------------------------------
# Title retrieval
# ------------------------------------------------------------------
def get_titles(self) -> Movies | Series:
"""Fetch title metadata and return a Movies or Series object."""
match = re.match(self.TITLE_RE, self.title)
if not match:
raise ValueError(f"Invalid title URL format: {self.title}")
content_id = match.group("id")
if not content_id:
raise ValueError(f"Invalid title URL: {self.title}.")
self._title_id = content_id
metadata = self._fetch_metadata(content_id)
if not metadata:
raise ValueError(f"Failed to fetch metadata for content ID: {content_id}.")
page_metadata = metadata.get("head", {}).get("pageMetadata", {})
sub_page_type = page_metadata.get("subPageType")
if sub_page_type == "Movie":
return self._parse_movie(metadata)
elif sub_page_type == "Season":
return self._parse_series(metadata)
else:
raise ValueError(f"Unsupported content type: {sub_page_type}")
def _parse_movie(self, metadata: dict) -> Movies:
"""Parse movie metadata into a Movies object."""
atf_state = metadata.get("body", {}).get("atf", {}).get("state", {})
page_title_id = atf_state.get("pageTitleId", "")
header_detail = atf_state.get("detail", {}).get("headerDetail", {})
self_info = atf_state.get("self", {})
movie_info = header_detail.get(page_title_id, {})
movie_self = self_info.get(page_title_id, {})
movie_name = movie_info.get("title", "Unknown Movie")
release_year = movie_info.get("releaseYear")
detail_id = movie_self.get("compactGTI", "")
# Try to extract playbackEnvelope from the PLAY primary action
playback_envelope: str | None = None
actions = atf_state.get("action", {}).get("atf", {}).get(page_title_id, {})
for action in actions.get("primaryActions", []):
if action.get("actionType") == "PLAY":
playback_envelope = (
action.get("payload", {}).get("playback", {}).get("playbackEnvelope")
)
break
return Movies([
Movie(
id_=page_title_id,
service=AMZN,
name=movie_name,
data={
"pageTitleId": page_title_id,
"detailId": detail_id,
"gti": page_title_id,
"playback_envelope": playback_envelope,
},
year=release_year,
)
])
def _parse_series(self, metadata: dict) -> Series:
"""Parse series metadata and recursively fetch all episodes."""
atf_state = metadata.get("body", {}).get("atf", {}).get("state", {})
header_detail = atf_state.get("detail", {}).get("headerDetail", {})
page_title_id = atf_state.get("pageTitleId", "")
header_info = header_detail.get(page_title_id, {})
series_name = header_info.get("parentTitle", "Unknown Series")
season_list = atf_state.get("seasons", {}).get(page_title_id, [])
all_episodes: list[Episode] = []
if season_list:
for season in season_list:
season_id = season.get("seasonId", "")
season_number = season.get("sequenceNumber", 1)
season_metadata = self._fetch_metadata(season_id)
if season_metadata:
all_episodes.extend(
self._parse_season_episodes(season_metadata, series_name, season_number)
)
else:
# No seasons — try to parse episodes directly from the current page
all_episodes.extend(self._parse_season_episodes(metadata, series_name, 1))
if not all_episodes:
raise ValueError(f"No episodes found for series: {series_name}")
return Series(all_episodes)
def _parse_season_episodes(
self, metadata: dict, series_name: str, season_number: int
) -> list[Episode]:
"""Parse episodes from a season's metadata."""
atf_state = metadata.get("body", {}).get("atf", {}).get("state", {})
btf_state = metadata.get("body", {}).get("btf", {}).get("state", {})
all_episodes: dict[str, Episode] = {}
# BTF episodes (primary source)
episodes_detail_btf = btf_state.get("detail", {}).get("detail", {})
actions_btf = btf_state.get("action", {}).get("btf", {})
for gti, info in episodes_detail_btf.items():
if info.get("titleType") == "episode":
episode = self._build_episode(gti, info, actions_btf.get(gti, {}), series_name, season_number)
if episode:
all_episodes[gti] = episode
# ATF episodes (fallback / supplement)
episodes_detail_atf = atf_state.get("detail", {}).get("detail", {})
actions_atf = atf_state.get("action", {}).get("atf", {})
for gti, info in episodes_detail_atf.items():
if info.get("titleType") == "episode" and gti not in all_episodes:
episode = self._build_episode(gti, info, actions_atf.get(gti, {}), series_name, season_number)
if episode:
all_episodes[gti] = episode
return list(all_episodes.values())
def _build_episode(
self,
gti: str,
info: dict,
actions: dict,
series_name: str,
season_number: int,
) -> Episode | None:
"""Build an Episode object from metadata."""
episode_title = info.get("title", "Unknown Episode")
episode_number = info.get("episodeNumber")
# Locate playbackEnvelope inside the PLAY primary action
playback_envelope: str | None = None
for action in actions.get("primaryActions", []):
if action.get("actionType") == "PLAY":
playback_envelope = (
action.get("payload", {}).get("playback", {}).get("playbackEnvelope")
)
break
return Episode(
id_=gti,
service=AMZN,
name=episode_title,
season=season_number,
number=episode_number,
title=series_name,
data={"gti": gti, "playback_envelope": playback_envelope},
)
# ------------------------------------------------------------------
# Track retrieval
# ------------------------------------------------------------------
def get_tracks(self, title: Episode | Movie) -> Tracks:
if not isinstance(title, (Episode, Movie)):
raise TypeError(f"Expected Episode or Movie, got {type(title).__name__}.")
content_id = title.data.get("gti")
if not content_id:
raise ValueError("No content ID found in title data.")
playback_envelope = title.data.get("playback_envelope")
if not playback_envelope:
raise ValueError("No playback_envelope found in title.data.")
playback_data = self._fetch_playback_sources(playback_envelope, content_id)
if not playback_data:
raise ValueError(f"Failed to fetch playback sources for title ID: {content_id}.")
widevine_cert = self._extract_widevine_service_certificate(playback_data)
session_handoff_token = self._extract_session_handoff_token(playback_data)
manifest_url = self._extract_manifest_url(playback_data)
if not manifest_url:
raise ValueError("No manifest URL found in playback sources.")
all_tracks = DASH.from_url(manifest_url, self.session).to_tracks(language="de")
# Attach DRM metadata to every track in a single pass
for track in all_tracks:
track.data["session_handoff_token"] = session_handoff_token
if widevine_cert:
track.data["widevine_service_certificate"] = widevine_cert
subtitle_tracks = self._extract_subtitle_tracks(playback_data)
return Tracks(all_tracks + subtitle_tracks)
def get_chapters(self, title: Movies | Series) -> Chapters:
return Chapters()
# ------------------------------------------------------------------
# Widevine DRM
# ------------------------------------------------------------------
def get_widevine_service_certificate(
self, *, challenge: bytes, title: Movies | Series, track: AnyTrack
) -> bytes | None:
"""Return the pre-fetched Widevine service certificate stored on the track."""
return track.data.get("widevine_service_certificate")
def get_widevine_license(
self,
*,
challenge: bytes,
title: Movies | Series,
track: AnyTrack,
) -> bytes | None:
"""Obtain a Widevine license for the given track."""
licence_endpoint = self.config["endpoints"]["licence_endpoint"]
device_type_id = self.config["devine"]["deviceTypeID"]
title_id = title.data.get("gti", "")
playback_envelope = title.data.get("playback_envelope")
if not playback_envelope:
raise ValueError("No playback envelope found in title data.")
session_handoff_token = track.data.get("session_handoff_token")
if not session_handoff_token:
raise ValueError("No session handoff token found in track data.")
params = {
"deviceID": self.device_id,
"deviceTypeID": device_type_id,
"marketplaceID": self.customer_config.get("marketplaceID", ""),
"titleId": title_id,
}
json_data = {
"includeHdcpTestKey": True,
"playbackEnvelope": playback_envelope,
"sessionHandoffToken": session_handoff_token,
"licenseChallenge": base64.b64encode(challenge).decode("utf-8"),
}
response = self.session.post(
licence_endpoint,
headers={**self.session.headers, "Content-Type": "application/json"},
params=params,
json=json_data,
)
response.raise_for_status()
self.log.info("Successfully fetched Widevine license.")
license_data = response.json()
encoded_license = license_data.get("widevineLicense", {}).get("license")
if encoded_license:
return base64.b64decode(encoded_license)
return None
# ------------------------------------------------------------------
# Playback data extraction helpers
# ------------------------------------------------------------------
def _extract_manifest_url(self, playback_data: dict) -> str | None:
"""Extract the DASH manifest URL from playback sources data."""
intra_title = (
playback_data
.get("vodPlaylistedPlaybackUrls", {})
.get("result", {})
.get("playbackUrls", {})
.get("intraTitlePlaylist", [])
)
if not intra_title:
return None
# Prefer the section explicitly marked as "Main"
for section in intra_title:
if section.get("type") == "Main":
urls = section.get("urls", [])
if urls:
return urls[0].get("url")
# Fallback: first available URL (older manifests may lack a "type" key)
for section in intra_title:
urls = section.get("urls", [])
if urls:
return urls[0].get("url")
return None
def _extract_subtitle_tracks(self, playback_data: dict) -> list[Subtitle]:
"""Extract subtitle tracks from playback sources data."""
timed_text = playback_data.get("timedTextUrls", {}).get("result", {})
subtitle_tracks: list[Subtitle] = []
for subtitle in timed_text.get("subtitleUrls", []):
language = subtitle.get("languageCode", "")
url = subtitle.get("url", "")
fmt = subtitle.get("format", "")
sub_type = subtitle.get("type", "")
if fmt == "TTMLv2":
fmt = "ttml"
if language and url:
subtitle_tracks.append(Subtitle(
id_=f"{language}_{sub_type}_{fmt}",
url=url,
codec=Subtitle.Codec.from_mime(fmt),
language=language,
is_original_lang=False,
forced=False,
sdh=True,
))
return subtitle_tracks
def _extract_widevine_service_certificate(self, playback_data: dict) -> bytes | None:
"""Decode and return the Widevine service certificate from playback data.
Expected structure::
{
"widevineServiceCertificate": {
"result": {
"encodedServiceCertificate": "<BASE64>"
}
}
}
"""
encoded_cert = (
playback_data
.get("widevineServiceCertificate", {})
.get("result", {})
.get("encodedServiceCertificate")
)
if encoded_cert:
self.log.info("Successfully extracted Widevine service certificate.")
return base64.b64decode(encoded_cert)
self.log.warning("No Widevine service certificate found in playback data.")
return None
def _extract_session_handoff_token(self, playback_data: dict) -> str:
"""Extract the session handoff token from playback data.
Expected structure::
{
"sessionization": {
"sessionHandoffToken": "<TOKEN>"
}
}
"""
token = playback_data.get("sessionization", {}).get("sessionHandoffToken")
if token:
return token
raise ValueError("No session handoff token found in playback data.")
# ------------------------------------------------------------------
# Network fetch helpers
# ------------------------------------------------------------------
def _fetch_metadata(self, content_id: str) -> dict | None:
"""Fetch JSON metadata for a given content ID from the detail page."""
url = self.config["endpoints"]["metadata_endpoint"].format(id=content_id)
response = self.session.get(url)
response.raise_for_status()
return response.json()
def _fetch_enriched_metadata(self, content_ids: list[str]) -> dict | None:
"""Fetch enriched hover/playback metadata for a list of content IDs.
This method is available for future use (e.g. pre-fetching playback
envelopes in bulk) but is not called by the main ingest flow.
"""
url = self.config["endpoints"]["enriched_metadata_endpoint"]
data = {
"metadataToEnrich": json.dumps({
"placement": "HOVER",
"playback": True,
"preroll": True,
"trailer": True,
"watchlist": True,
}),
"titleIDsToEnrich": json.dumps(content_ids),
"journeyIngressContext": "",
"currentUrl": "https://www.primevideo.com/",
}
response = self.session.post(
url,
headers={**self.session.headers, "Content-Type": "application/x-www-form-urlencoded"},
data=data,
)
response.raise_for_status()
return response.json()
def _fetch_playback_sources(self, playback_envelope: str, title_id: str) -> dict | None:
"""Fetch all playback resources (manifest, DRM, subtitles) for a title."""
url = self.config["endpoints"]["playback_sources_endpoint"]
device_type_id = self.config["devine"]["deviceTypeID"]
params = {
"deviceID": self.device_id,
"deviceTypeID": device_type_id,
"marketplaceID": self.customer_config.get("marketplaceID", ""),
"titleId": title_id,
}
json_data = {
"globalParameters": {
"deviceCapabilityFamily": "WebPlayer",
"playbackEnvelope": playback_envelope,
"capabilityDiscriminators": {
"operatingSystem": {"name": "Windows", "version": "Unknown"},
"deviceModel": {"name": "Unknown", "version": "UNKNOWN"},
"nativeApplication": {"name": "PVPWA", "version": "Unknown"},
"hfrControlMode": "Legacy",
"displayResolution": {"height": 1440, "width": 2560},
},
},
"timedTextUrlsRequest": {
"supportedTimedTextFormats": ["TTMLv2", "DFXP"],
},
"vodPlaylistedPlaybackUrlsRequest": {
"device": {
"hdcpLevel": "1.4",
"maxVideoResolution": "1080p",
"supportedStreamingTechnologies": ["DASH"],
"streamingTechnologies": {
"DASH": {
"bitrateAdaptations": ["CBR", "CVBR"],
"codecs": ["H264"],
"drmKeyScheme": "DualKey",
"drmType": "Widevine",
"dynamicRangeFormats": ["None"],
"edgeDeliveryAuthorizationSchemes": ["PVExchangeV1", "Transparent"],
"fragmentRepresentations": ["ByteOffsetRange", "SeparateFile"],
"frameRates": ["Standard", "High"],
"stitchType": "MultiPeriod",
"segmentInfoType": "Base",
"timedTextRepresentations": [
"NotInManifestNorStream",
"SeparateStreamInManifest",
],
"trickplayRepresentations": ["NotInManifestNorStream"],
"variableAspectRatio": "supported",
}
},
"displayWidth": 2560,
"displayHeight": 1440,
},
"playbackSettingsRequest": {
"deviceModel": "Unknown",
"firmware": "UNKNOWN",
"playerType": "xp",
"responseFormatVersion": "1.0.0",
"titleId": title_id,
},
},
"vodXrayMetadataRequest": {
"xrayDeviceClass": "normal",
"xrayPlaybackMode": "playback",
"xrayToken": "XRAY_WEB_2023_V2",
},
}
response = self.session.post(url, params=params, json=json_data)
response.raise_for_status()
return response.json()
def _fetch_customer_config(self) -> dict:
"""Fetch the Amazon startup/customer config for this device."""
url = self.config["endpoints"]["customer_config_endpoint"]
device_type_id = self.config["devine"]["deviceTypeID"]
params = {
"deviceID": self.device_id,
"deviceTypeID": device_type_id,
"format": "json",
}
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()