From f38d536d4fede1ff432bac917bfee05636ce0bef Mon Sep 17 00:00:00 2001 From: hatharry Date: Wed, 11 Feb 2026 02:48:47 +1300 Subject: [PATCH] Add Emby Music Provider (#3096) * Add Emby Music Provider * Update constants, Error handling and Audio Format for Emby Music Provider * Fix StreamDetails AudioFormat for Emby Music Provider * Revert "Fix StreamDetails AudioFormat for Emby Music Provider" This reverts commit 41ee5616ff6ee3216e5bc31df7426c8bc124f8de. * Fix StreamDetails AudioFormat for Emby Music Provider --------- Co-authored-by: Marcel van der Veldt --- music_assistant/providers/emby/__init__.py | 508 ++++++++++++++++++ music_assistant/providers/emby/const.py | 63 +++ music_assistant/providers/emby/icon.svg | 7 + .../providers/emby/icon_monochrome.svg | 44 ++ music_assistant/providers/emby/manifest.json | 12 + music_assistant/providers/emby/parsers.py | 268 +++++++++ 6 files changed, 902 insertions(+) create mode 100644 music_assistant/providers/emby/__init__.py create mode 100644 music_assistant/providers/emby/const.py create mode 100644 music_assistant/providers/emby/icon.svg create mode 100644 music_assistant/providers/emby/icon_monochrome.svg create mode 100644 music_assistant/providers/emby/manifest.json create mode 100644 music_assistant/providers/emby/parsers.py diff --git a/music_assistant/providers/emby/__init__.py b/music_assistant/providers/emby/__init__.py new file mode 100644 index 00000000..87ae3a61 --- /dev/null +++ b/music_assistant/providers/emby/__init__.py @@ -0,0 +1,508 @@ +"""Emby Music Provider for MusicAssistant.""" + +from __future__ import annotations + +import hashlib +import socket +from asyncio import TaskGroup +from collections.abc import AsyncGenerator +from typing import TYPE_CHECKING, Any +from urllib.parse import urljoin + +from aiohttp import ClientResponseError +from music_assistant_models.config_entries import ( + ConfigEntry, + ConfigValueType, + ProviderConfig, +) +from music_assistant_models.enums import ( + ConfigEntryType, + MediaType, + ProviderFeature, + StreamType, +) +from music_assistant_models.errors import ( + LoginFailed, + MediaNotFoundError, + ProviderPermissionDenied, +) +from music_assistant_models.media_items import ( + Album, + Artist, + AudioFormat, + Playlist, + SearchResults, + Track, +) +from music_assistant_models.streamdetails import StreamDetails + +from music_assistant.controllers.cache import use_cache +from music_assistant.mass import MusicAssistant +from music_assistant.models import ProviderInstanceType +from music_assistant.models.music_provider import MusicProvider +from music_assistant.providers.emby.const import ( + ALBUM_FIELDS, + ARTIST_FIELDS, + AUTH_ACCESS_TOKEN, + AUTH_USER, + ITEM_KEY_COLLECTION_TYPE, + ITEM_KEY_ID, + ITEM_KEY_MEDIA_STREAMS, + ITEM_LIMIT, + ITEMS, + SUPPORTED_CONTAINER_FORMATS, + TRACK_FIELDS, +) +from music_assistant.providers.emby.parsers import ( + parse_album, + parse_artist, + parse_playlist, + parse_track, +) + +if TYPE_CHECKING: + from music_assistant_models.provider import ProviderManifest + +from music_assistant.constants import ( + APPLICATION_NAME, + CONF_IP_ADDRESS, + CONF_PASSWORD, + CONF_USERNAME, +) + +SUPPORTED_FEATURES = { + ProviderFeature.LIBRARY_ARTISTS, + ProviderFeature.LIBRARY_ALBUMS, + ProviderFeature.LIBRARY_TRACKS, + ProviderFeature.LIBRARY_PLAYLISTS, + ProviderFeature.BROWSE, + ProviderFeature.SEARCH, + ProviderFeature.ARTIST_ALBUMS, + ProviderFeature.ARTIST_TOPTRACKS, + ProviderFeature.SIMILAR_TRACKS, +} + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return EmbyProvider(mass, manifest, config, SUPPORTED_FEATURES) + + +async def get_config_entries( + mass: MusicAssistant, + instance_id: str | None = None, + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """Get configuration entries for provider setup.""" + # ruff: noqa: ARG001 + return ( + ConfigEntry( + key=CONF_IP_ADDRESS, + type=ConfigEntryType.STRING, + label="Server", + required=True, + description="The url of the Emby server to connect to.", + ), + ConfigEntry( + key=CONF_USERNAME, + type=ConfigEntryType.STRING, + label="Username", + required=True, + description="The username to authenticate to the remote server.", + ), + ConfigEntry( + key=CONF_PASSWORD, + type=ConfigEntryType.SECURE_STRING, + label="Password", + required=False, + description="The password to authenticate to the remote server.", + ), + ) + + +class EmbyProvider(MusicProvider): + """Provider for an Emby music library (uses Emby REST API).""" + + async def handle_async_init(self) -> None: + """Initialize provider(instance) with given configuration.""" + username = str(self.config.get_value(CONF_USERNAME)) + password = str(self.config.get_value(CONF_PASSWORD) or "") + self._base_url = str(self.config.get_value(CONF_IP_ADDRESS)).rstrip("/") + "/" + self._session = self.mass.http_session + + # stable device id + device_id = hashlib.sha256(f"{self.mass.server_id}+{username}".encode()).hexdigest() + self._device_id = device_id + self._device_name = socket.gethostname() + + # authenticate against Emby /Users/AuthenticateByName + auth_url = urljoin(self._base_url, "Users/AuthenticateByName") + payload = {"Username": username, "Pw": password} + headers = { + "Accept": "application/json", + "X-Emby-Authorization": ( + f'MediaBrowser Client="{APPLICATION_NAME}", ' + f'Device="{self._device_name}", ' + f'DeviceId="{device_id}", ' + f'Version="{self.mass.version}"' + ), + } + try: + async with self._session.post(auth_url, json=payload, headers=headers) as resp: + resp.raise_for_status() + data = await resp.json() + except ClientResponseError as err: + if err.status == 401: + raise LoginFailed("Unauthorized: invalid credentials") from err + if err.status == 403: + raise ProviderPermissionDenied("Forbidden: insufficient permissions") from err + if err.status == 404: + raise MediaNotFoundError("Authentication endpoint not found") from err + raise + + # store token and user id + token = data.get(AUTH_ACCESS_TOKEN) + user = data.get(AUTH_USER) + if not token or not user: + raise LoginFailed("Authentication failed: missing token/user in response") + self._token = token + self._user_id = user.get(ITEM_KEY_ID) + self._headers = { + "Accept": "application/json", + "X-Emby-Token": self._token, + "X-Emby-Authorization": ( + f'MediaBrowser Client="{APPLICATION_NAME}", ' + f'Device="{self._device_name}", ' + f'DeviceId="{device_id}", ' + f'Version="{self.mass.version}", ' + f'Token="{self._token}"' + ), + } + + @property + def is_streaming_provider(self) -> bool: + """Return True if provider supports streaming.""" + return False + + async def _get(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: + url = urljoin(self._base_url, path.lstrip("/")) + try: + async with self._session.get(url, headers=self._headers, params=params) as resp: + resp.raise_for_status() + return await resp.json() # type: ignore[no-any-return] + except ClientResponseError as err: + if err.status == 401: + raise LoginFailed("Unauthorized: invalid credentials") from err + if err.status == 403: + raise ProviderPermissionDenied("Forbidden: insufficient permissions") from err + if err.status == 404: + raise MediaNotFoundError(f"Item {path} not found") from err + raise + + async def _search_items( + self, search_query: str, include_types: str, fields: list[str], limit: int + ) -> list[dict[str, Any]]: + params = { + "SearchTerm": search_query, + "IncludeItemTypes": include_types, + "EnableUserData": "true", + "Fields": ",".join(fields or []), + "Limit": str(limit), + "Recursive": "true", + } + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + return resp.get(ITEMS, []) # type: ignore[no-any-return] + + async def _search_track(self, search_query: str, limit: int) -> list[Track]: + items = await self._search_items(search_query, "Audio", TRACK_FIELDS, limit) + return [parse_track(self.instance_id, self, item) for item in items] + + async def _search_album(self, search_query: str, limit: int) -> list[Album]: + albumname = search_query.split(" - ", 1)[1] if " - " in search_query else search_query + items = await self._search_items(albumname, "MusicAlbum", ALBUM_FIELDS, limit) + return [parse_album(self.instance_id, self, item) for item in items] + + async def _search_artist(self, search_query: str, limit: int) -> list[Artist]: + items = await self._search_items(search_query, "MusicArtist", ARTIST_FIELDS, limit) + return [parse_artist(self.instance_id, self, item) for item in items] + + async def _search_playlist(self, search_query: str, limit: int) -> list[Playlist]: + items = await self._search_items(search_query, "Playlist", [], limit) + return [parse_playlist(self.instance_id, self, item) for item in items] + + @use_cache(60 * 15) + async def search( + self, + search_query: str, + media_types: list[MediaType], + limit: int = 20, + ) -> SearchResults: + """Search for media items in the Emby library.""" + artists = None + albums = None + tracks = None + playlists = None + + async with TaskGroup() as tg: + if MediaType.ARTIST in media_types: + artists = tg.create_task(self._search_artist(search_query, limit)) + if MediaType.ALBUM in media_types: + albums = tg.create_task(self._search_album(search_query, limit)) + if MediaType.TRACK in media_types: + tracks = tg.create_task(self._search_track(search_query, limit)) + if MediaType.PLAYLIST in media_types: + playlists = tg.create_task(self._search_playlist(search_query, limit)) + + search_results = SearchResults() + if artists: + search_results.artists = artists.result() + if albums: + search_results.albums = albums.result() + if tracks: + search_results.tracks = tracks.result() + if playlists: + search_results.playlists = playlists.result() + return search_results + + async def get_library_artists(self) -> AsyncGenerator[Artist, None]: + """Yield all artists from the music library.""" + libs = await self._get_music_libraries() + for lib in libs: + params = { + "ParentId": lib[ITEM_KEY_ID], + "IncludeItemTypes": "MusicArtist", + "EnableUserData": "true", + "Fields": ",".join(ARTIST_FIELDS), + "Recursive": "true", + } + page = 0 + while True: + params["StartIndex"] = str(page * ITEM_LIMIT) + params["Limit"] = ITEM_LIMIT + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + items = resp.get(ITEMS, []) + if not items: + break + for artist in items: + yield parse_artist(self.instance_id, self, artist) + page += 1 + + async def get_library_albums(self) -> AsyncGenerator[Album, None]: + """Yield all albums from the music library.""" + libs = await self._get_music_libraries() + for lib in libs: + params = { + "ParentId": lib[ITEM_KEY_ID], + "IncludeItemTypes": "MusicAlbum", + "EnableUserData": "true", + "Fields": ",".join(ALBUM_FIELDS), + "Recursive": "true", + } + page = 0 + while True: + params["StartIndex"] = str(page * ITEM_LIMIT) + params["Limit"] = ITEM_LIMIT + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + items = resp.get(ITEMS, []) + if not items: + break + for album in items: + yield parse_album(self.instance_id, self, album) + page += 1 + + async def get_library_tracks(self) -> AsyncGenerator[Track, None]: + """Yield all tracks from the music library.""" + libs = await self._get_music_libraries() + for lib in libs: + params = { + "ParentId": lib[ITEM_KEY_ID], + "IncludeItemTypes": "Audio", + "EnableUserData": "true", + "Fields": ",".join(TRACK_FIELDS), + "Recursive": "true", + } + page = 0 + while True: + params["StartIndex"] = str(page * ITEM_LIMIT) + params["Limit"] = ITEM_LIMIT + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + items = resp.get(ITEMS, []) + if not items: + break + for track in items: + if not len(track.get(ITEM_KEY_MEDIA_STREAMS, [])): + continue + yield parse_track(self.instance_id, self, track) + page += 1 + + async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: + """Yield all playlists from the music library.""" + libs = await self._get_music_libraries() + for lib in libs: + params = { + "ParentId": lib[ITEM_KEY_ID], + "IncludeItemTypes": "Playlist", + "EnableUserData": "true", + "Recursive": "true", + } + page = 0 + while True: + params["StartIndex"] = str(page * ITEM_LIMIT) + params["Limit"] = ITEM_LIMIT + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + items = resp.get(ITEMS, []) + if not items: + break + for playlist in items: + yield parse_playlist(self.instance_id, self, playlist) + page += 1 + + @use_cache(3600) + async def get_album(self, prov_album_id: str) -> Album: + """Get album by provider album id.""" + album = await self._get( + f"Users/{self._user_id}/Items/{prov_album_id}", + params={ + "EnableUserData": "true", + "Fields": ",".join(ALBUM_FIELDS), + "Recursive": "true", + }, + ) + return parse_album(self.instance_id, self, album) + + @use_cache(3600) + async def get_album_tracks(self, prov_album_id: str) -> list[Track]: + """Get tracks for a given album by provider album id.""" + params = { + "ParentId": prov_album_id, + "IncludeItemTypes": "Audio", + "EnableUserData": "true", + "Fields": ",".join(TRACK_FIELDS), + "Limit": ITEM_LIMIT, + "Recursive": "true", + } + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + return [parse_track(self.instance_id, self, item) for item in resp.get(ITEMS, [])] + + @use_cache(60 * 15) + async def get_artist(self, prov_artist_id: str) -> Artist: + """Get artist by provider artist id.""" + artist_data = await self._get( + f"Users/{self._user_id}/Items/{prov_artist_id}", + params={"EnableUserData": "true", "Fields": ",".join(ARTIST_FIELDS)}, + ) + + return parse_artist(self.instance_id, self, artist_data) + + @use_cache(3600) + async def get_artist_toptracks(self, prov_artist_id: str, limit: int = 25) -> list[Track]: + """Get top tracks for a given artist by provider artist id.""" + params = { + "ArtistIds": prov_artist_id, + "IncludeItemTypes": "Audio", + "EnableUserData": "true", + "Fields": ",".join(TRACK_FIELDS), + "Recursive": "true", + "Limit": str(limit), + "SortBy": "PlayCount", + "SortOrder": "Descending", + } + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + return [parse_track(self.instance_id, self, item) for item in resp.get(ITEMS, [])] + + @use_cache(60 * 15) + async def get_track(self, prov_track_id: str) -> Track: + """Get track by provider track id.""" + track = await self._get( + f"Users/{self._user_id}/Items/{prov_track_id}", + params={"EnableUserData": "true", "Fields": ",".join(TRACK_FIELDS)}, + ) + + return parse_track(self.instance_id, self, track) + + @use_cache(60 * 15) + async def get_playlist(self, prov_playlist_id: str) -> Playlist: + """Get playlist by provider playlist id.""" + playlist = await self._get( + f"Users/{self._user_id}/Items/{prov_playlist_id}", + params={"EnableUserData": "true"}, + ) + + return parse_playlist(self.instance_id, self, playlist) + + @use_cache(3600) + async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: + """Get tracks for a given playlist by provider playlist id.""" + result: list[Track] = [] + params = { + "ParentId": prov_playlist_id, + "IncludeItemTypes": "Audio", + "EnableUserData": "true", + "Fields": ",".join(TRACK_FIELDS), + "Limit": ITEM_LIMIT, + "StartIndex": str(page * ITEM_LIMIT), + } + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + for index, item in enumerate(resp.get(ITEMS, []), 1): + pos = (page * ITEM_LIMIT) + index + if track := parse_track(self.instance_id, self, item): + track.position = pos + result.append(track) + + return result + + @use_cache(3600) + async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: + """Get albums for a given artist by provider artist id.""" + params = { + "AlbumArtistIds": prov_artist_id, + "IncludeItemTypes": "MusicAlbum", + "Fields": ",".join(ALBUM_FIELDS), + "EnableUserData": "true", + "Recursive": "true", + } + resp = await self._get(f"Users/{self._user_id}/Items", params=params) + return [parse_album(self.instance_id, self, album) for album in resp.get(ITEMS, [])] + + async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: + """Get stream details for given item id and media type.""" + track = await self.get_track(item_id) + # build universal audio URL (include token as query param for convenience) + container = ",".join(SUPPORTED_CONTAINER_FORMATS) + url = urljoin(self._base_url, f"Audio/{track.item_id}/universal") + params = {"Container": container, "api_key": self._token} + query = "&".join([f"{k}={v}" for k, v in params.items()]) + return StreamDetails( + item_id=track.item_id, + provider=self.instance_id, + audio_format=AudioFormat(), + stream_type=StreamType.HTTP, + duration=int(track.duration) if getattr(track, "duration", None) else 0, + path=f"{url}?{query}", + can_seek=True, + allow_seek=True, + ) + + @use_cache(3600) + async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]: + """Get similar tracks.""" + resp = await self._get( + f"Items/{prov_track_id}/Similar", + params={"Limit": str(limit), "Fields": ",".join(TRACK_FIELDS)}, + ) + + return [parse_track(self.instance_id, self, t) for t in resp.get(ITEMS, [])] + + async def _get_music_libraries(self) -> list[dict[str, Any]]: + resp = await self._get("Library/MediaFolders") + libs = resp.get(ITEMS, []) + result = [] + for library in libs: + if ITEM_KEY_COLLECTION_TYPE in library: + collection_type = library.get(ITEM_KEY_COLLECTION_TYPE, "").lower() + if collection_type == "music": + result.append(library) + return result diff --git a/music_assistant/providers/emby/const.py b/music_assistant/providers/emby/const.py new file mode 100644 index 00000000..733d7b73 --- /dev/null +++ b/music_assistant/providers/emby/const.py @@ -0,0 +1,63 @@ +"""Constants for Emby provider.""" + +# Emby API item keys +AUTH_ACCESS_TOKEN = "AccessToken" +AUTH_USER = "User" + +ITEMS = "Items" +ITEM_LIMIT = 500 +ITEM_KEY_ID = "Id" +ITEM_KEY_RUNTIME_TICKS = "RunTimeTicks" +ITEM_KEY_MEDIA_STREAMS = "MediaStreams" +ITEM_KEY_COLLECTION_TYPE = "CollectionType" +ITEM_KEY_NAME = "Name" +ITEM_KEY_ALBUM_ID = "AlbumId" +ITEM_KEY_ALBUM_NAME = "Album" +ITEM_KEY_ARTIST_ITEMS = "ArtistItems" +ITEM_KEY_IMAGE_TAGS = "ImageTags" +ITEM_KEY_DATE_CREATED = "DateCreated" +ITEM_KEY_PRODUCTION_YEAR = "ProductionYear" +ITEM_KEY_OVERVIEW = "Overview" +ITEM_KEY_DURATION = "Duration" +ITEM_KEY_ARTISTS = "Artists" +ITEM_KEY_PLAYLIST_ITEMS = "PlaylistItems" +ITEM_KEY_TYPE = "Type" +ITEM_KEY_CONTAINER = "Container" + +AUDIO_STREAM_CODEC = "Codec" +AUDIO_STREAM_SAMPLE_RATE = "SampleRate" +AUDIO_STREAM_BIT_DEPTH = "BitDepth" +AUDIO_STREAM_CHANNELS = "Channels" + +# Field lists for API requests +TRACK_FIELDS = [ + "Name", + "Artists", + "Album", + "AlbumId", + "Duration", + "RunTimeTicks", + "MediaStreams", + "ImageTags", + "DateCreated", +] + +ALBUM_FIELDS = [ + "Name", + "Artists", + "ArtistItems", + "Overview", + "ImageTags", + "DateCreated", + "ProductionYear", +] + +ARTIST_FIELDS = [ + "Name", + "Overview", + "ImageTags", + "DateCreated", +] + +# Supported audio containers for streaming +SUPPORTED_CONTAINER_FORMATS = ["mp3", "flac", "aac", "opus", "wav", "m4a"] diff --git a/music_assistant/providers/emby/icon.svg b/music_assistant/providers/emby/icon.svg new file mode 100644 index 00000000..64593281 --- /dev/null +++ b/music_assistant/providers/emby/icon.svg @@ -0,0 +1,7 @@ + + + + + + + diff --git a/music_assistant/providers/emby/icon_monochrome.svg b/music_assistant/providers/emby/icon_monochrome.svg new file mode 100644 index 00000000..b04ce536 --- /dev/null +++ b/music_assistant/providers/emby/icon_monochrome.svg @@ -0,0 +1,44 @@ + + + + + + + + diff --git a/music_assistant/providers/emby/manifest.json b/music_assistant/providers/emby/manifest.json new file mode 100644 index 00000000..d9d053c9 --- /dev/null +++ b/music_assistant/providers/emby/manifest.json @@ -0,0 +1,12 @@ +{ + "type": "music", + "domain": "emby", + "stage": "alpha", + "name": "Emby Media Server Library", + "description": "Stream music from your self-hosted Emby server.", + "codeowners": ["@hatharry"], + "credits": [], + "requirements": [], + "documentation": "https://music-assistant.io/music-providers/emby/", + "multi_instance": true +} diff --git a/music_assistant/providers/emby/parsers.py b/music_assistant/providers/emby/parsers.py new file mode 100644 index 00000000..1717cb00 --- /dev/null +++ b/music_assistant/providers/emby/parsers.py @@ -0,0 +1,268 @@ +"""Parsers for Emby API responses.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import ContentType, ImageType +from music_assistant_models.media_items import ( + Album, + Artist, + AudioFormat, + ItemMapping, + MediaItemImage, + Playlist, + ProviderMapping, + Track, +) +from music_assistant_models.unique_list import UniqueList + +from music_assistant.providers.emby.const import ( + AUDIO_STREAM_BIT_DEPTH, + AUDIO_STREAM_CHANNELS, + AUDIO_STREAM_CODEC, + AUDIO_STREAM_SAMPLE_RATE, + ITEM_KEY_ALBUM_ID, + ITEM_KEY_ALBUM_NAME, + ITEM_KEY_ARTIST_ITEMS, + ITEM_KEY_CONTAINER, + ITEM_KEY_ID, + ITEM_KEY_IMAGE_TAGS, + ITEM_KEY_MEDIA_STREAMS, + ITEM_KEY_NAME, + ITEM_KEY_RUNTIME_TICKS, + ITEM_KEY_TYPE, +) + +if TYPE_CHECKING: + from music_assistant.providers.emby import EmbyProvider + + +def parse_track( + instance_id: str, + provider: EmbyProvider, + item: dict[str, Any], +) -> Track: + """Parse an Emby Audio item into a Track.""" + track_id = str(item.get(ITEM_KEY_ID)) + name = str(item.get(ITEM_KEY_NAME)) + + # Extract artist info + artists = UniqueList[Artist | ItemMapping]() + if artist_items := item.get(ITEM_KEY_ARTIST_ITEMS): + for artist_item in artist_items: + artist_name = str(artist_item.get(ITEM_KEY_NAME)) + artist_id = str(artist_item.get(ITEM_KEY_ID)) + + artists.append( + Artist( + item_id=artist_id, + name=artist_name, + provider=instance_id, + provider_mappings={ + ProviderMapping( + item_id=artist_id, + provider_domain=provider.domain, + provider_instance=instance_id, + ) + }, + ) + ) + + album_id = str(item.get(ITEM_KEY_ALBUM_ID)) + album_name = str(item.get(ITEM_KEY_ALBUM_NAME)) + + album = Album( + item_id=album_id, + name=album_name, + provider=instance_id, + provider_mappings={ + ProviderMapping( + item_id=album_id, + provider_domain=provider.domain, + provider_instance=instance_id, + ) + }, + ) + + duration = int(item.get(ITEM_KEY_RUNTIME_TICKS, 0) / 10000000) # Convert ticks to seconds + media_streams = item.get(ITEM_KEY_MEDIA_STREAMS, [{}]) + audio_stream = next((dict(s) for s in media_streams if s.get(ITEM_KEY_TYPE) == "Audio"), {}) + + track = Track( + item_id=track_id, + name=name, + album=album, + artists=artists, + duration=duration, + provider=instance_id, + provider_mappings={ + ProviderMapping( + item_id=track_id, + provider_domain=provider.domain, + provider_instance=instance_id, + audio_format=AudioFormat( + content_type=ContentType.try_parse(str(item.get(ITEM_KEY_CONTAINER))), + codec_type=ContentType.try_parse(str(audio_stream.get(AUDIO_STREAM_CODEC))), + sample_rate=int(audio_stream.get(AUDIO_STREAM_SAMPLE_RATE, 44100)), + bit_depth=int(audio_stream.get(AUDIO_STREAM_BIT_DEPTH, 16)), + channels=int(audio_stream.get(AUDIO_STREAM_CHANNELS, 2)), + ), + ) + }, + ) + + # Extract images + if "Primary" in item.get(ITEM_KEY_IMAGE_TAGS, {}): + image_url = f"{provider._base_url}Items/{track_id}/Images/Primary" + if track.metadata.images is None: + track.metadata.images = UniqueList[MediaItemImage]() + track.metadata.images.append( + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=instance_id, + remotely_accessible=True, + ) + ) + + return track + + +def parse_artist( + instance_id: str, + provider: EmbyProvider, + item: dict[str, Any], +) -> Artist: + """Parse an Emby MusicArtist item into an Artist.""" + artist_id = str(item.get(ITEM_KEY_ID)) + name = str(item.get(ITEM_KEY_NAME)) + + artist = Artist( + item_id=artist_id, + name=name, + provider=instance_id, + provider_mappings={ + ProviderMapping( + item_id=artist_id, + provider_domain=provider.domain, + provider_instance=instance_id, + ) + }, + ) + + # Extract images + if "Primary" in item.get(ITEM_KEY_IMAGE_TAGS, {}): + image_url = f"{provider._base_url}Items/{artist_id}/Images/Primary" + if artist.metadata.images is None: + artist.metadata.images = UniqueList[MediaItemImage]() + artist.metadata.images.append( + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=instance_id, + remotely_accessible=True, + ) + ) + + return artist + + +def parse_album( + instance_id: str, + provider: EmbyProvider, + item: dict[str, Any], +) -> Album: + """Parse an Emby MusicAlbum item into an Album.""" + album_id = str(item.get(ITEM_KEY_ID)) + name = str(item.get(ITEM_KEY_NAME)) + + # Extract artist info + artists = UniqueList[Artist | ItemMapping]() + if artist_items := item.get(ITEM_KEY_ARTIST_ITEMS): + for artist_item in artist_items: + artist_id = str(artist_item.get(ITEM_KEY_ID)) + artist_name = str(artist_item.get(ITEM_KEY_NAME)) + + artists.append( + Artist( + item_id=artist_id, + name=artist_name, + provider=instance_id, + provider_mappings={ + ProviderMapping( + item_id=artist_id, + provider_domain=provider.domain, + provider_instance=instance_id, + ) + }, + ) + ) + + album = Album( + item_id=album_id, + name=name, + artists=artists, + provider=instance_id, + provider_mappings={ + ProviderMapping( + item_id=album_id, + provider_domain=provider.domain, + provider_instance=instance_id, + ) + }, + ) + + # Extract images + if image_id := item.get("PrimaryImageItemId"): + image_url = f"{provider._base_url}Items/{image_id}/Images/Primary" + if album.metadata.images is None: + album.metadata.images = UniqueList[MediaItemImage]() + album.metadata.images.append( + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=instance_id, + remotely_accessible=True, + ) + ) + + return album + + +def parse_playlist( + instance_id: str, + provider: EmbyProvider, + item: dict[str, Any], +) -> Playlist: + """Parse an Emby Playlist item into a Playlist.""" + playlist_id = str(item.get(ITEM_KEY_ID)) + name = str(item.get(ITEM_KEY_NAME)) + + playlist = Playlist( + item_id=playlist_id, + name=name, + provider=instance_id, + provider_mappings={ + ProviderMapping( + item_id=playlist_id, + provider_domain=provider.domain, + provider_instance=instance_id, + ) + }, + ) + # Extract images + if "Primary" in item.get(ITEM_KEY_IMAGE_TAGS, {}): + image_url = f"{provider._base_url}Items/{playlist_id}/Images/Primary" + if playlist.metadata.images is None: + playlist.metadata.images = UniqueList[MediaItemImage]() + playlist.metadata.images.append( + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=instance_id, + remotely_accessible=True, + ) + ) + + return playlist -- 2.34.1