--- /dev/null
+"""MLB.TV Music Provider for Music Assistant.
+
+Provides access to live MLB radio broadcasts via the MLB API.
+Based on the API patterns from the plugin.video.mlbtv Kodi add-on
+by eracknaphobia (https://github.com/eracknaphobia/plugin.video.mlbtv).
+"""
+
+from __future__ import annotations
+
+import calendar
+from collections.abc import Sequence
+from datetime import UTC, datetime, timedelta
+from typing import TYPE_CHECKING, Any
+from urllib.parse import quote
+
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ ContentType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant_models.errors import LoginFailed, MediaNotFoundError
+from music_assistant_models.media_items import (
+ AudioFormat,
+ BrowseFolder,
+ ItemMapping,
+ MediaItemImage,
+ MediaItemMetadata,
+ MediaItemType,
+ ProviderMapping,
+ Radio,
+ UniqueList,
+)
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
+from music_assistant.models.music_provider import MusicProvider
+
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+
+API_URL = "https://statsapi.mlb.com"
+OKTA_TOKEN_URL = "https://ids.mlb.com/oauth2/aus1m088yK07noBfh356/v1/token"
+OKTA_CLIENT_ID = "0oa3e1nutA1HLzAKG356"
+MEDIA_GATEWAY_URL = "https://media-gateway.mlb.com/graphql"
+
+UA_PC = (
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
+)
+UA_ANDROID = "okhttp/3.12.1"
+
+MLB_SPORT_ID = "1"
+MLB_TEAM_IDS = (
+ "108,109,110,111,112,113,114,115,116,117,118,119,120,121,"
+ "133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,"
+ "158,159,160"
+)
+
+SUPPORTED_FEATURES = {
+ ProviderFeature.BROWSE,
+}
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ if not config.get_value(CONF_USERNAME) or not config.get_value(CONF_PASSWORD):
+ msg = "Username and password are required"
+ raise LoginFailed(msg)
+ return MLBProvider(mass, manifest, config, SUPPORTED_FEATURES)
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """Return Config entries to setup this provider.
+
+ :param mass: The MusicAssistant instance.
+ :param instance_id: id of an existing provider instance (None if new instance setup).
+ :param action: action key called from config entries UI.
+ :param values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ return (
+ ConfigEntry(
+ key=CONF_USERNAME,
+ type=ConfigEntryType.STRING,
+ label="MLB.TV Username (email)",
+ required=True,
+ ),
+ ConfigEntry(
+ key=CONF_PASSWORD,
+ type=ConfigEntryType.SECURE_STRING,
+ label="MLB.TV Password",
+ required=True,
+ ),
+ )
+
+
+class MLBProvider(MusicProvider):
+ """Music provider for MLB.TV radio broadcasts."""
+
+ _login_token: str | None = None
+ _login_token_expiry: datetime | None = None
+ _device_id: str | None = None
+ _session_id: str | None = None
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True if the provider is a streaming provider."""
+ return True
+
+ async def handle_async_init(self) -> None:
+ """Handle async initialization of the provider."""
+ await self._ensure_login()
+
+ async def unload(self, is_removed: bool = False) -> None:
+ """Handle unload/close of the provider."""
+ self._login_token = None
+ self._login_token_expiry = None
+ self._device_id = None
+ self._session_id = None
+
+ # =========================================================================
+ # Authentication
+ # =========================================================================
+
+ async def _ensure_login(self) -> str:
+ """Ensure we have a valid login token, refreshing if needed."""
+ if (
+ self._login_token
+ and self._login_token_expiry
+ and self._login_token_expiry > datetime.now(tz=UTC)
+ ):
+ return self._login_token
+
+ username = self.config.get_value(CONF_USERNAME)
+ password = self.config.get_value(CONF_PASSWORD)
+
+ headers = {
+ "User-Agent": UA_ANDROID,
+ "Content-Type": "application/x-www-form-urlencoded",
+ }
+ payload = (
+ f"grant_type=password"
+ f"&username={quote(str(username))}"
+ f"&password={quote(str(password))}"
+ f"&scope=openid offline_access"
+ f"&client_id={OKTA_CLIENT_ID}"
+ )
+
+ async with self.mass.http_session.post(
+ OKTA_TOKEN_URL, headers=headers, data=payload
+ ) as resp:
+ if resp.status != 200:
+ body = await resp.json()
+ error_desc = body.get("error_description", "Login failed")
+ msg = f"MLB.TV login failed: {error_desc}"
+ raise LoginFailed(msg)
+ data = await resp.json()
+
+ self._login_token = data["access_token"]
+ self._login_token_expiry = datetime.now(tz=UTC) + timedelta(
+ seconds=int(data.get("expires_in", 3600)) - 60
+ )
+ self.logger.debug("MLB.TV login successful, token expires at %s", self._login_token_expiry)
+
+ await self._init_session()
+ return self._login_token
+
+ async def _init_session(self) -> None:
+ """Initialize a media gateway session to get device_id and session_id."""
+ token = self._login_token
+ if not token:
+ return
+
+ headers = {
+ "User-Agent": UA_PC,
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ data = {
+ "operationName": "initSession",
+ "query": (
+ "mutation initSession("
+ "$device: InitSessionInput!, $clientType: ClientType!, "
+ "$experience: ExperienceTypeInput"
+ ") { initSession(device: $device, clientType: $clientType, "
+ "experience: $experience) { deviceId sessionId "
+ "entitlements { code } "
+ "location { countryCode zipCode } } }"
+ ),
+ "variables": {
+ "device": {
+ "appVersion": "7.8.2",
+ "deviceFamily": "desktop",
+ "knownDeviceId": "",
+ "languagePreference": "ENGLISH",
+ "manufacturer": "Google Inc.",
+ "model": "",
+ "os": "windows",
+ "osVersion": "10",
+ },
+ "clientType": "WEB",
+ },
+ }
+
+ async with self.mass.http_session.post(
+ MEDIA_GATEWAY_URL, headers=headers, json=data
+ ) as resp:
+ result = await resp.json()
+
+ session_data = result.get("data", {}).get("initSession", {})
+ self._device_id = session_data.get("deviceId", "")
+ self._session_id = session_data.get("sessionId", "")
+ entitlements = [e["code"] for e in session_data.get("entitlements", [])]
+ self.logger.debug(
+ "MLB session initialized: device=%s, entitlements=%s",
+ self._device_id,
+ entitlements,
+ )
+
+ # =========================================================================
+ # Browse
+ # =========================================================================
+
+ async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ """Browse this provider's items.
+
+ :param path: The path to browse, (e.g. provider_id://artists).
+ """
+ prefix = f"{self.instance_id}://"
+ relative_path = path.removeprefix(prefix)
+
+ if not relative_path or relative_path == "/":
+ return await self._browse_root()
+
+ if relative_path.startswith("game/"):
+ game_pk = relative_path.split("/")[1]
+ return await self._browse_game_feeds(game_pk)
+
+ return []
+
+ async def _browse_root(self) -> list[BrowseFolder | Radio]:
+ """Browse the root level: today's games."""
+ game_date = self._get_game_date()
+ games = await self._fetch_schedule(game_date)
+ items: list[BrowseFolder | Radio] = []
+
+ for game in games:
+ game_pk = str(game["gamePk"])
+ away_team = game["teams"]["away"]["team"].get("teamName", "Away")
+ home_team = game["teams"]["home"]["team"].get("teamName", "Home")
+ away_abbr = game["teams"]["away"]["team"].get("abbreviation", "???")
+ home_abbr = game["teams"]["home"]["team"].get("abbreviation", "???")
+ state = game.get("status", {}).get("detailedState", "Scheduled")
+
+ game_time_str = ""
+ if "gameDate" in game:
+ game_utc = datetime.strptime(game["gameDate"], "%Y-%m-%dT%H:%M:%SZ").replace(
+ tzinfo=UTC
+ )
+ local_time = datetime.fromtimestamp(
+ calendar.timegm(game_utc.timetuple()), tz=UTC
+ ).astimezone()
+ game_time_str = local_time.strftime("%I:%M %p").lstrip("0") + " "
+
+ if state in ("In Progress", "Live"):
+ status_prefix = "[LIVE] "
+ elif state in ("Final", "Game Over"):
+ status_prefix = "[Final] "
+ else:
+ status_prefix = game_time_str
+
+ label = f"{status_prefix}{away_abbr} at {home_abbr}"
+
+ audio_feeds = self._get_audio_feeds(game)
+ if len(audio_feeds) == 1:
+ feed = audio_feeds[0]
+ radio = self._feed_to_radio(feed, away_team, home_team, game)
+ items.append(radio)
+ elif len(audio_feeds) > 1:
+ folder = BrowseFolder(
+ item_id=f"game/{game_pk}",
+ provider=self.instance_id,
+ path=f"{self.instance_id}://game/{game_pk}",
+ name=label,
+ )
+ home_id = game["teams"]["home"]["team"]["id"]
+ folder.image = MediaItemImage(
+ type=ImageType.THUMB,
+ path=f"https://www.mlbstatic.com/team-logos/share/{home_id}.jpg",
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ items.append(folder)
+ elif len(audio_feeds) == 0 and state not in ("Final", "Game Over", "Postponed"):
+ folder = BrowseFolder(
+ item_id=f"game/{game_pk}",
+ provider=self.instance_id,
+ path=f"{self.instance_id}://game/{game_pk}",
+ name=f"{label} (no audio feeds yet)",
+ )
+ items.append(folder)
+
+ return items
+
+ async def _browse_game_feeds(self, game_pk: str) -> list[Radio]:
+ """Browse the audio feeds for a specific game."""
+ game_date = self._get_game_date()
+ games = await self._fetch_schedule(game_date, game_pk=game_pk)
+ items: list[Radio] = []
+ for game in games:
+ if str(game["gamePk"]) == game_pk:
+ away_team = game["teams"]["away"]["team"].get("teamName", "Away")
+ home_team = game["teams"]["home"]["team"].get("teamName", "Home")
+ for feed in self._get_audio_feeds(game):
+ items.append(self._feed_to_radio(feed, away_team, home_team, game))
+ break
+ return items
+
+ def _feed_to_radio(
+ self, feed: dict[str, Any], away_team: str, home_team: str, game: dict[str, Any]
+ ) -> Radio:
+ """Convert a broadcast feed dict into a Radio item."""
+ media_id = feed["mediaId"]
+ call_sign = feed.get("callSign", "")
+ home_away = feed.get("homeAway", "home").capitalize()
+ language = feed.get("language", "en")
+
+ lang_label = ""
+ if language == "es":
+ lang_label = " Spanish"
+
+ away_abbr = game["teams"]["away"]["team"].get("abbreviation", "???")
+ home_abbr = game["teams"]["home"]["team"].get("abbreviation", "???")
+ name = f"{away_abbr} at {home_abbr} - {home_away}{lang_label} Radio ({call_sign})"
+
+ home_id = game["teams"]["home"]["team"]["id"]
+
+ return Radio(
+ item_id=media_id,
+ provider=self.instance_id,
+ name=name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=media_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.UNKNOWN,
+ ),
+ available=True,
+ )
+ },
+ metadata=MediaItemMetadata(
+ description=f"{away_team} at {home_team}",
+ images=UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=f"https://www.mlbstatic.com/team-logos/share/{home_id}.jpg",
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ ),
+ ),
+ )
+
+ # =========================================================================
+ # Stream details
+ # =========================================================================
+
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
+ """Get streamdetails for a radio broadcast.
+
+ :param item_id: The media ID of the broadcast feed.
+ :param media_type: The media type.
+ """
+ token = await self._ensure_login()
+ if not self._device_id or not self._session_id:
+ await self._init_session()
+
+ stream_url = await self._get_playback_url(item_id, token)
+ if not stream_url:
+ msg = f"Could not resolve stream for media ID {item_id}"
+ raise MediaNotFoundError(msg)
+
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=item_id,
+ audio_format=AudioFormat(content_type=ContentType.UNKNOWN),
+ media_type=MediaType.RADIO,
+ stream_type=StreamType.HLS,
+ path=stream_url,
+ allow_seek=False,
+ can_seek=False,
+ extra_input_args=["-user_agent", UA_PC],
+ )
+
+ async def _get_playback_url(self, media_id: str, token: str) -> str | None:
+ """Resolve a media ID to an HLS stream URL via the GraphQL playback endpoint."""
+ headers = {
+ "User-Agent": UA_PC,
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ data = {
+ "operationName": "initPlaybackSession",
+ "query": (
+ "mutation initPlaybackSession("
+ "$adCapabilities: [AdExperienceType] "
+ "$mediaId: String! "
+ "$deviceId: String! "
+ "$sessionId: String! "
+ "$quality: PlaybackQuality"
+ ") { initPlaybackSession("
+ "adCapabilities: $adCapabilities "
+ "mediaId: $mediaId "
+ "deviceId: $deviceId "
+ "sessionId: $sessionId "
+ "quality: $quality"
+ ") { playbackSessionId "
+ "playback { url token expiration cdn } "
+ "heartbeatInfo { url interval } } }"
+ ),
+ "variables": {
+ "adCapabilities": ["GOOGLE_STANDALONE_AD_PODS"],
+ "mediaId": media_id,
+ "quality": "PLACEHOLDER",
+ "deviceId": self._device_id or "",
+ "sessionId": self._session_id or "",
+ },
+ }
+
+ async with self.mass.http_session.post(
+ MEDIA_GATEWAY_URL, headers=headers, json=data
+ ) as resp:
+ result = await resp.json()
+
+ if "errors" in result:
+ error_codes = [e.get("code", e.get("message", "unknown")) for e in result["errors"]]
+ self.logger.error("Playback session errors: %s", error_codes)
+ return None
+
+ playback = result.get("data", {}).get("initPlaybackSession", {}).get("playback", {})
+ url: str | None = playback.get("url")
+ return url
+
+ # =========================================================================
+ # MLB Stats API helpers
+ # =========================================================================
+
+ async def _fetch_schedule(
+ self, game_date: str, game_pk: str | None = None
+ ) -> list[dict[str, Any]]:
+ """Fetch games from the MLB Stats API for a given date.
+
+ :param game_date: Date string in YYYY-MM-DD format.
+ :param game_pk: Optional specific game PK to fetch.
+ """
+ url = f"{API_URL}/api/v1/schedule"
+ params: dict[str, str] = {
+ "hydrate": "broadcasts(all),game(content(media(epg))),probablePitcher,linescore,team",
+ "sportId": MLB_SPORT_ID,
+ "date": game_date,
+ }
+ if game_pk:
+ params["gamePk"] = game_pk
+ else:
+ params["teamId"] = MLB_TEAM_IDS
+
+ headers = {"User-Agent": UA_ANDROID}
+
+ async with self.mass.http_session.get(url, params=params, headers=headers) as resp:
+ data = await resp.json()
+
+ games: list[dict[str, Any]] = []
+ for date_entry in data.get("dates", []):
+ games.extend(date_entry.get("games", []))
+ return games
+
+ @staticmethod
+ def _get_audio_feeds(game: dict[str, Any]) -> list[dict[str, Any]]:
+ """Extract available audio broadcast feeds from a game object."""
+ audio_feeds = []
+ for broadcast in game.get("broadcasts", []):
+ if broadcast.get("type") in ("FM", "AM"):
+ media_state = broadcast.get("mediaState", {}).get("mediaStateCode", "MEDIA_OFF")
+ if media_state != "MEDIA_OFF":
+ audio_feeds.append(broadcast)
+ return audio_feeds
+
+ @staticmethod
+ def _get_game_date() -> str:
+ """Get the current game date in Eastern time.
+
+ MLB game dates don't roll over until ~4 AM ET so late-night games
+ still appear under the previous calendar day.
+ """
+ utc_now = datetime.now(tz=UTC)
+ eastern_offset = timedelta(hours=-5)
+ eastern_now = utc_now + eastern_offset
+ if eastern_now.hour < 4:
+ eastern_now -= timedelta(days=1)
+ return eastern_now.strftime("%Y-%m-%d")
+
+ async def get_radio(self, prov_radio_id: str) -> Radio:
+ """Get full radio details by id.
+
+ :param prov_radio_id: The media ID of the broadcast feed.
+ """
+ game_date = self._get_game_date()
+ games = await self._fetch_schedule(game_date)
+ for game in games:
+ away_team = game["teams"]["away"]["team"].get("teamName", "Away")
+ home_team = game["teams"]["home"]["team"].get("teamName", "Home")
+ for feed in self._get_audio_feeds(game):
+ if feed["mediaId"] == prov_radio_id:
+ return self._feed_to_radio(feed, away_team, home_team, game)
+ msg = f"Radio feed {prov_radio_id} not found"
+ raise MediaNotFoundError(msg)
+
+ async def resolve_image(self, path: str) -> str | bytes:
+ """Resolve an image from an image path."""
+ return path