from collections.abc import Callable
from typing import TYPE_CHECKING, Any
-from music_assistant.client.exceptions import ConnectionClosed, InvalidServerVersion, InvalidState
+from music_assistant.client.exceptions import (
+ ConnectionClosed,
+ InvalidServerVersion,
+ InvalidState,
+)
from music_assistant.common.models.api import (
- ChunkedResultMessage,
CommandMessage,
ErrorResultMessage,
EventMessage,
SuccessResultMessage,
parse_message,
)
-from music_assistant.common.models.enums import EventType
+from music_assistant.common.models.enums import EventType, ImageType
from music_assistant.common.models.errors import ERROR_MAP
from music_assistant.common.models.event import MassEvent
+from music_assistant.common.models.media_items import ItemMapping, MediaItemType
+from music_assistant.common.models.provider import ProviderInstance, ProviderManifest
+from music_assistant.common.models.queue_item import QueueItem
from music_assistant.constants import API_SCHEMA_VERSION
+from .config import Config
from .connection import WebsocketsConnection
from .music import Music
+from .player_queues import PlayerQueues
from .players import Players
if TYPE_CHECKING:
self._subscribers: list[EventSubscriptionType] = []
self._stop_called: bool = False
self._loop: asyncio.AbstractEventLoop | None = None
+ self._config = Config(self)
self._players = Players(self)
+ self._player_queues = PlayerQueues(self)
self._music = Music(self)
# below items are retrieved after connect
self._server_info: ServerInfoMessage | None = None
+ self._provider_manifests: dict[str, ProviderManifest] = {}
+ self._providers: dict[str, ProviderInstance] = {}
@property
def server_info(self) -> ServerInfoMessage | None:
"""Return info of the server we're currently connected to."""
return self._server_info
+ @property
+ def providers(self) -> list[ProviderInstance]:
+ """Return all loaded/running Providers (instances)."""
+ return list(self._providers.values())
+
+ @property
+ def provider_manifests(self) -> list[ProviderManifest]:
+ """Return all Provider manifests."""
+ return list(self._provider_manifests.values())
+
+ @property
+ def config(self) -> Config:
+ """Return Config handler."""
+ return self._config
+
@property
def players(self) -> Players:
"""Return Players handler."""
return self._players
+ @property
+ def player_queues(self) -> PlayerQueues:
+ """Return PlayerQueues handler."""
+ return self._player_queues
+
@property
def music(self) -> Music:
"""Return Music handler."""
return self._music
- def get_image_url(self, image: MediaItemImage) -> str:
+ def get_provider_manifest(self, domain: str) -> ProviderManifest:
+ """Return Provider manifests of single provider(domain)."""
+ return self._provider_manifests[domain]
+
+ def get_provider(
+ self, provider_instance_or_domain: str, return_unavailable: bool = False
+ ) -> ProviderInstance | None:
+ """Return provider by instance id or domain."""
+ # lookup by instance_id first
+ if prov := self._providers.get(provider_instance_or_domain):
+ if return_unavailable or prov.available:
+ return prov
+ if not prov.is_streaming_provider:
+ # no need to lookup other instances because this provider has unique data
+ return None
+ provider_instance_or_domain = prov.domain
+ # fallback to match on domain
+ for prov in self._providers.values():
+ if prov.domain != provider_instance_or_domain:
+ continue
+ if return_unavailable or prov.available:
+ return prov
+ self.logger.debug("Provider %s is not available", provider_instance_or_domain)
+ return None
+
+ def get_image_url(self, image: MediaItemImage, size: int = 0) -> str:
"""Get (proxied) URL for MediaItemImage."""
- if image.remotely_accessible:
+ if image.remotely_accessible and not size:
return image.path
+ if image.remotely_accessible and size:
+ # get url to resized image(thumb) from weserv service
+ return (
+ f"https://images.weserv.nl/?url={urllib.parse.quote(image.path)}"
+ f"&w=${size}&h=${size}&fit=cover&a=attention"
+ )
# return imageproxy url for images that need to be resolved
# the original path is double encoded
encoded_url = urllib.parse.quote(urllib.parse.quote(image.path))
return (
- f"{self.server_info.base_url}/imageproxy?path={encoded_url}&provider={image.provider}"
+ f"{self.server_info.base_url}/imageproxy?path={encoded_url}"
+ f"&provider={image.provider}&size={size}"
)
+ def get_media_item_image_url(
+ self,
+ item: MediaItemType | ItemMapping | QueueItem,
+ type: ImageType = ImageType.THUMB, # noqa: A002
+ size: int = 0,
+ ) -> str | None:
+ """Get image URL for MediaItem, QueueItem or ItemMapping."""
+ # handle queueitem with media_item attribute
+ if media_item := getattr(item, "media_item", None):
+ if img := self.music.get_media_item_image(media_item, type):
+ return self.get_image_url(img, size)
+ if img := self.music.get_media_item_image(item, type):
+ return self.get_image_url(img, size)
+ return None
+
def subscribe(
self,
cb_func: EventCallBackType,
self._server_info = info
self.logger.info(
- "Connected to Music Assistant Server %s using %s, Version %s, Schema Version %s",
+ "Connected to Music Assistant Server %s, Version %s, Schema Version %s",
info.server_id,
- self.connection.__class__.__name__,
info.server_version,
info.schema_version,
)
# fetch initial state
# we do this in a separate task to not block reading messages
async def fetch_initial_state() -> None:
+ self._providers = {
+ x["instance_id"]: ProviderInstance.from_dict(x)
+ for x in await self.send_command("providers")
+ }
+ self._provider_manifests = {
+ x["domain"]: ProviderManifest.from_dict(x)
+ for x in await self.send_command("providers/manifests")
+ }
+ await self._player_queues.fetch_state()
await self._players.fetch_state()
if init_ready is not None:
if future is None:
# no listener for this result
return
- if isinstance(msg, ChunkedResultMessage):
- # handle chunked response (for very large objects)
- if not hasattr(future, "intermediate_result"):
- future.intermediate_result = []
- future.intermediate_result += msg.result
- if msg.is_last_chunk:
- future.set_result(future.intermediate_result)
- return
if isinstance(msg, SuccessResultMessage):
future.set_result(msg.result)
return
if self._stop_called:
return
+ if event.event == EventType.PROVIDERS_UPDATED:
+ self._providers = {x["instance_id"]: ProviderInstance.from_dict(x) for x in event.data}
+
for cb_func, event_filter, id_filter in self._subscribers:
if not (event_filter is None or event.event in event_filter):
continue
--- /dev/null
+"""Handle Config related endpoints for Music Assistant."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from music_assistant.common.models.config_entries import (
+ ConfigEntry,
+ ConfigValueType,
+ CoreConfig,
+ PlayerConfig,
+ ProviderConfig,
+)
+from music_assistant.common.models.enums import ProviderType
+
+if TYPE_CHECKING:
+ from .client import MusicAssistantClient
+
+
+class Config:
+ """Config related endpoints/data for Music Assistant."""
+
+ def __init__(self, client: MusicAssistantClient) -> None:
+ """Handle Initialization."""
+ self.client = client
+
+ # Provider Config related commands/functions
+
+ async def get_provider_configs(
+ self,
+ provider_type: ProviderType | None = None,
+ provider_domain: str | None = None,
+ include_values: bool = False,
+ ) -> list[ProviderConfig]:
+ """Return all known provider configurations, optionally filtered by ProviderType."""
+ return [
+ ProviderConfig.from_dict(item)
+ for item in await self.client.send_command(
+ "config/providers",
+ provider_type=provider_type,
+ provider_domain=provider_domain,
+ include_values=include_values,
+ )
+ ]
+
+ async def get_provider_config(self, instance_id: str) -> ProviderConfig:
+ """Return (full) configuration for a single provider."""
+ return ProviderConfig.from_dict(
+ await self.client.send_command("config/providers/get", instance_id=instance_id)
+ )
+
+ async def get_provider_config_value(self, instance_id: str, key: str) -> ConfigValueType:
+ """Return single configentry value for a provider."""
+ return await self.client.send_command(
+ "config/providers/get_value", instance_id=instance_id, key=key
+ )
+
+ async def get_provider_config_entries(
+ self,
+ provider_domain: str,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+ ) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup/configure a provider.
+
+ provider_domain: (mandatory) domain of the provider.
+ instance_id: id of an existing provider instance (None for new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ return (
+ ConfigEntry.from_dict(x)
+ for x in await self.client.send_command(
+ "config/providers/get_entries",
+ provider_domain=provider_domain,
+ instance_id=instance_id,
+ action=action,
+ values=values,
+ )
+ )
+
+ async def save_provider_config(
+ self,
+ provider_domain: str,
+ values: dict[str, ConfigValueType],
+ instance_id: str | None = None,
+ ) -> ProviderConfig:
+ """
+ Save Provider(instance) Config.
+
+ provider_domain: (mandatory) domain of the provider.
+ values: the raw values for config entries that need to be stored/updated.
+ instance_id: id of an existing provider instance (None for new instance setup).
+ """
+ return ProviderConfig.from_dict(
+ await self.client.send_command(
+ "config/providers/save",
+ provider_domain=provider_domain,
+ values=values,
+ instance_id=instance_id,
+ )
+ )
+
+ async def remove_provider_config(self, instance_id: str) -> None:
+ """Remove ProviderConfig."""
+ await self.client.send_command(
+ "config/providers/remove",
+ instance_id=instance_id,
+ )
+
+ async def reload_provider(self, instance_id: str) -> None:
+ """Reload provider."""
+ await self.client.send_command(
+ "config/providers/reload",
+ instance_id=instance_id,
+ )
+
+ # Player Config related commands/functions
+
+ async def get_player_configs(
+ self, provider: str | None = None, include_values: bool = False
+ ) -> list[PlayerConfig]:
+ """Return all known player configurations, optionally filtered by provider domain."""
+ return [
+ PlayerConfig.from_dict(item)
+ for item in await self.client.send_command(
+ "config/players",
+ provider=provider,
+ include_values=include_values,
+ )
+ ]
+
+ async def get_player_config(self, player_id: str) -> PlayerConfig:
+ """Return (full) configuration for a single player."""
+ return PlayerConfig.from_dict(
+ await self.client.send_command("config/players/get", player_id=player_id)
+ )
+
+ async def get_player_config_value(
+ self,
+ player_id: str,
+ key: str,
+ ) -> ConfigValueType:
+ """Return single configentry value for a player."""
+ return await self.client.send_command(
+ "config/players/get_value", player_id=player_id, key=key
+ )
+
+ async def save_player_config(
+ self, player_id: str, values: dict[str, ConfigValueType]
+ ) -> PlayerConfig:
+ """Save/update PlayerConfig."""
+ return PlayerConfig.from_dict(
+ await self.client.send_command(
+ "config/players/save", player_id=player_id, values=values
+ )
+ )
+
+ async def remove_player_config(self, player_id: str) -> None:
+ """Remove PlayerConfig."""
+ await self.client.send_command("config/players/remove", player_id=player_id)
+
+ # Core Controller config commands
+
+ async def get_core_configs(self, include_values: bool = False) -> list[CoreConfig]:
+ """Return all core controllers config options."""
+ return [
+ CoreConfig.from_dict(item)
+ for item in await self.client.send_command(
+ "config/core",
+ include_values=include_values,
+ )
+ ]
+
+ async def get_core_config(self, domain: str) -> CoreConfig:
+ """Return configuration for a single core controller."""
+ return CoreConfig.from_dict(
+ await self.client.send_command(
+ "config/core/get",
+ domain=domain,
+ )
+ )
+
+ async def get_core_config_value(self, domain: str, key: str) -> ConfigValueType:
+ """Return single configentry value for a core controller."""
+ return await self.client.send_command("config/core/get_value", domain=domain, key=key)
+
+ async def get_core_config_entries(
+ self,
+ domain: str,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+ ) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to configure a core controller.
+
+ core_controller: name of the core controller
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ return (
+ ConfigEntry.from_dict(x)
+ for x in await self.client.send_command(
+ "config/core/get_entries",
+ domain=domain,
+ action=action,
+ values=values,
+ )
+ )
+
+ async def save_core_config(
+ self,
+ domain: str,
+ values: dict[str, ConfigValueType],
+ ) -> CoreConfig:
+ """Save CoreController Config values."""
+ return CoreConfig.from_dict(
+ await self.client.send_command(
+ "config/core/get_entries",
+ domain=domain,
+ values=values,
+ )
+ )
import urllib.parse
from typing import TYPE_CHECKING
-from music_assistant.common.models.enums import MediaType
+from music_assistant.common.models.enums import ImageType, MediaType
from music_assistant.common.models.media_items import (
Album,
AlbumTrack,
Artist,
+ ItemMapping,
+ MediaItemImage,
+ MediaItemMetadata,
MediaItemType,
PagedItems,
Playlist,
+ PlaylistTrack,
Radio,
SearchResults,
Track,
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
- ) -> PagedItems:
+ ) -> PagedItems[Track]:
"""Get Track listing from the server."""
return PagedItems.parse(
await self.client.send_command(
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
- ) -> PagedItems:
+ ) -> PagedItems[Album]:
"""Get Albums listing from the server."""
return PagedItems.parse(
await self.client.send_command(
offset: int | None = None,
order_by: str | None = None,
album_artists_only: bool = False,
- ) -> PagedItems:
+ ) -> PagedItems[Artist]:
"""Get Artists listing from the server."""
return PagedItems.parse(
await self.client.send_command(
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
- ) -> PagedItems:
+ ) -> PagedItems[Playlist]:
"""Get Playlists listing from the server."""
return PagedItems.parse(
await self.client.send_command(
self,
item_id: str,
provider_instance_id_or_domain: str,
- ) -> list[Track]:
+ limit: int | None = None,
+ offset: int | None = None,
+ ) -> PagedItems[PlaylistTrack]:
"""Get tracks for given playlist."""
- return [
- Track.from_dict(item)
- for item in await self.client.send_command(
+ return PagedItems.parse(
+ await self.client.send_command(
"music/playlists/playlist_tracks",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
- )
- ]
+ limit=limit,
+ offset=offset,
+ ),
+ PlaylistTrack,
+ )
async def add_playlist_tracks(self, db_playlist_id: str | int, uris: list[str]) -> None:
"""Add multiple tracks to playlist. Creates background tasks to process the action."""
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
- ) -> PagedItems:
+ ) -> PagedItems[Radio]:
"""Get Radio listing from the server."""
return PagedItems.parse(
await self.client.send_command(
- "music/radio/library_items",
+ "music/radios/library_items",
favorite=favorite,
search=search,
limit=limit,
"""Get single Radio from the server."""
return Radio.from_dict(
await self.client.send_command(
- "music/radio/get_item",
+ "music/radios/get_radio",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
),
return [
Radio.from_dict(item)
for item in await self.client.send_command(
- "music/radio/radio_versions",
+ "music/radios/radio_versions",
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
)
# Other/generic endpoints/commands
+ async def start_sync(
+ self,
+ media_types: list[MediaType] | None = None,
+ providers: list[str] | None = None,
+ ) -> None:
+ """Start running the sync of (all or selected) musicproviders.
+
+ media_types: only sync these media types. None for all.
+ providers: only sync these provider instances. None for all.
+ """
+ await self.client.send_command("music/sync", media_types=media_types, providers=providers)
+
+ async def get_running_sync_tasks(self) -> list[SyncTask]:
+ """Return list with providers that are currently (scheduled for) syncing."""
+ return [SyncTask(**item) for item in await self.client.send_command("music/synctasks")]
+
+ async def search(
+ self,
+ search_query: str,
+ media_types: list[MediaType] = MediaType.ALL,
+ limit: int = 50,
+ ) -> SearchResults:
+ """Perform global search for media items on all providers.
+
+ :param search_query: Search query.
+ :param media_types: A list of media_types to include.
+ :param limit: number of items to return in the search (per type).
+ """
+ return SearchResults.from_dict(
+ await self.client.send_command(
+ "music/search",
+ search_query=search_query,
+ media_types=media_types,
+ limit=limit,
+ ),
+ )
+
+ async def browse(
+ self,
+ path: str | None = None,
+ limit: int | None = None,
+ offset: int | None = None,
+ ) -> PagedItems[MediaItemType]:
+ """Browse Music providers."""
+ return PagedItems.parse(
+ await self.client.send_command(
+ "music/browse",
+ path=path,
+ limit=limit,
+ offset=offset,
+ ),
+ MediaItemType,
+ )
+
+ async def recently_played(
+ self, limit: int = 10, media_types: list[MediaType] | None = None
+ ) -> list[MediaItemType]:
+ """Return a list of the last played items."""
+ return [
+ media_from_dict(item)
+ for item in await self.client.send_command(
+ "music/recently_played_items", limit=limit, media_types=media_types
+ )
+ ]
+
async def get_item_by_uri(
self,
uri: str,
"""Get single music item providing a mediaitem uri."""
return media_from_dict(await self.client.send_command("music/item_by_uri", uri=uri))
- async def refresh_item(
- self,
- media_item: MediaItemType,
- ) -> MediaItemType | None:
- """Try to refresh a mediaitem by requesting it's full object or search for substitutes."""
- if result := await self.client.send_command("music/refresh_item", media_item=media_item):
- return media_from_dict(result)
- return None
-
async def get_item(
self,
media_type: MediaType,
item_id: str,
provider_instance_id_or_domain: str,
+ force_refresh: bool = False,
+ lazy: bool = True,
+ add_to_library: bool = False,
) -> MediaItemType:
"""Get single music item by id and media type."""
return media_from_dict(
media_type=media_type,
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
+ force_refresh=force_refresh,
+ lazy=lazy,
+ add_to_library=add_to_library,
)
)
- async def add_item_to_library(self, item: str | MediaItemType) -> MediaItemType:
- """Add item (uri or mediaitem) to the library."""
- await self.client.send_command("music/library/add_item", item=item)
-
- async def remove_item_from_library(
- self, media_type: MediaType, library_item_id: str | int
- ) -> None:
- """
- Remove item from the library.
-
- Destructive! Will remove the item and all dependants.
- """
- await self.client.send_command(
- "music/library/remove",
- media_type=media_type,
- library_item_id=library_item_id,
- )
-
async def add_item_to_favorites(
self,
- media_type: MediaType,
- item_id: str,
- provider_instance_id_or_domain: str,
+ item: str | MediaItemType,
) -> None:
"""Add an item to the favorites."""
- await self.client.send_command(
- "music/favorites/add_item",
- media_type=media_type,
- item_id=item_id,
- provider_instance_id_or_domain=provider_instance_id_or_domain,
- )
+ await self.client.send_command("music/favorites/add_item", item=item)
async def remove_item_from_favorites(
self,
item_id=item_id,
)
- async def browse(
- self,
- path: str | None = None,
- ) -> list[MediaItemType]:
- """Browse Music providers."""
- return [
- media_from_dict(item)
- for item in await self.client.send_command("music/browse", path=path)
- ]
+ async def remove_item_from_library(
+ self, media_type: MediaType, library_item_id: str | int
+ ) -> None:
+ """
+ Remove item from the library.
- async def search(
- self,
- search_query: str,
- media_types: tuple[MediaType] = MediaType.ALL,
- limit: int = 25,
- ) -> SearchResults:
- """Perform global search for media items on all providers."""
- return SearchResults.from_dict(
- await self.client.send_command(
- "music/search",
- search_query=search_query,
- media_types=media_types,
- limit=limit,
- ),
+ Destructive! Will remove the item and all dependants.
+ """
+ await self.client.send_command(
+ "music/library/remove_item",
+ media_type=media_type,
+ library_item_id=library_item_id,
)
- async def get_sync_tasks(self) -> list[SyncTask]:
- """Return any/all sync tasks that are in progress on the server."""
- return [SyncTask(**item) for item in await self.client.send_command("music/synctasks")]
+ async def add_item_to_library(self, item: str | MediaItemType) -> MediaItemType:
+ """Add item (uri or mediaitem) to the library."""
+ return await self.client.send_command("music/library/add_item", item=item)
+
+ async def refresh_item(
+ self,
+ media_item: MediaItemType,
+ ) -> MediaItemType | None:
+ """Try to refresh a mediaitem by requesting it's full object or search for substitutes."""
+ if result := await self.client.send_command("music/refresh_item", media_item=media_item):
+ return media_from_dict(result)
+ return None
+
+ # helpers
+
+ def get_media_item_image(
+ self,
+ item: MediaItemType | ItemMapping,
+ type: ImageType = ImageType.THUMB, # noqa: A002
+ ) -> MediaItemImage | None:
+ """Get MediaItemImage for MediaItem, ItemMapping."""
+ if not item:
+ # guard for unexpected bad things
+ return None
+ # handle image in itemmapping
+ if item.image and item.image.type == type:
+ return item.image
+ # always prefer album image for tracks
+ album: Album | ItemMapping | None
+ if album := getattr(item, "album", None):
+ if album_image := self.get_media_item_image(album, type):
+ return album_image
+ # handle regular image within mediaitem
+ metadata: MediaItemMetadata
+ if metadata := getattr(item, "metadata", None):
+ for img in metadata.images or []:
+ if img.type == type:
+ return img
+ # retry with album/track artist(s)
+ artists: list[Artist | ItemMapping] | None
+ if artists := getattr(item, "artists", None):
+ for artist in artists:
+ if artist_image := self.get_media_item_image(artist, type):
+ return artist_image
+ # allow landscape fallback
+ if type == ImageType.THUMB:
+ return self.get_media_item_image(item, ImageType.LANDSCAPE)
+ return None
--- /dev/null
+"""Handle PlayerQueues related endpoints for Music Assistant."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from music_assistant.common.models.enums import EventType, QueueOption, RepeatMode
+from music_assistant.common.models.media_items import PagedItems
+from music_assistant.common.models.player_queue import PlayerQueue
+from music_assistant.common.models.queue_item import QueueItem
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+ from music_assistant.common.models.event import MassEvent
+ from music_assistant.common.models.media_items import MediaItemType
+
+ from .client import MusicAssistantClient
+
+
+class PlayerQueues:
+ """PlayerQueue related endpoints/data for Music Assistant."""
+
+ def __init__(self, client: MusicAssistantClient) -> None:
+ """Handle Initialization."""
+ self.client = client
+ # subscribe to player events
+ client.subscribe(
+ self._handle_event,
+ (
+ EventType.QUEUE_ADDED,
+ EventType.QUEUE_UPDATED,
+ ),
+ )
+ # the initial items are retrieved after connect
+ self._queues: dict[str, PlayerQueue] = {}
+
+ @property
+ def player_queues(self) -> list[PlayerQueue]:
+ """Return all player queues."""
+ return list(self._queues.values())
+
+ def __iter__(self) -> Iterator[PlayerQueue]:
+ """Iterate over (available) PlayerQueues."""
+ return iter(self._queues.values())
+
+ def get(self, queue_id: str) -> PlayerQueue | None:
+ """Return PlayerQueue by ID (or None if not found)."""
+ return self._queues.get(queue_id)
+
+ # PlayerQueue related endpoints/commands
+
+ async def get_player_queue_items(
+ self, queue_id: str, limit: int = 500, offset: int = 0
+ ) -> PagedItems[QueueItem]:
+ """Get all QueueItems for given PlayerQueue."""
+ return PagedItems.parse(
+ await self.client.send_command(
+ "player_queues/items", queue_id=queue_id, limit=limit, offset=offset
+ )
+ )
+
+ async def get_active_queue(self, player_id: str) -> PlayerQueue:
+ """Return the current active/synced queue for a player."""
+ return PlayerQueue.from_dict(
+ await self.client.send_command("player_queues/get_active_queue", player_id=player_id)
+ )
+
+ async def queue_command_play(self, queue_id: str) -> None:
+ """Send PLAY command to given queue."""
+ await self.client.send_command("player_queues/play", queue_id=queue_id)
+
+ async def queue_command_pause(self, queue_id: str) -> None:
+ """Send PAUSE command to given queue."""
+ await self.client.send_command("player_queues/pause", queue_id=queue_id)
+
+ async def queue_command_stop(self, queue_id: str) -> None:
+ """Send STOP command to given queue."""
+ await self.client.send_command("player_queues/stop", queue_id=queue_id)
+
+ async def queue_command_resume(self, queue_id: str, fade_in: bool | None = None) -> None:
+ """Handle RESUME command for given queue.
+
+ - queue_id: queue_id of the queue to handle the command.
+ """
+ await self.client.send_command("player_queues/resume", queue_id=queue_id, fade_in=fade_in)
+
+ async def queue_command_next(self, queue_id: str) -> None:
+ """Send NEXT TRACK command to given queue."""
+ await self.client.send_command("player_queues/next", queue_id=queue_id)
+
+ async def queue_command_previous(self, queue_id: str) -> None:
+ """Send PREVIOUS TRACK command to given queue."""
+ await self.client.send_command("player_queues/previous", queue_id=queue_id)
+
+ async def queue_command_clear(self, queue_id: str) -> None:
+ """Send CLEAR QUEUE command to given queue."""
+ await self.client.send_command("player_queues/clear", queue_id=queue_id)
+
+ async def queue_command_move_item(
+ self, queue_id: str, queue_item_id: str, pos_shift: int = 1
+ ) -> None:
+ """
+ Move queue item x up/down the queue.
+
+ Parameters:
+ - queue_id: id of the queue to process this request.
+ - queue_item_id: the item_id of the queueitem that needs to be moved.
+ - pos_shift: move item x positions down if positive value
+ - pos_shift: move item x positions up if negative value
+ - pos_shift: move item to top of queue as next item if 0
+
+ NOTE: Fails if the given QueueItem is already playing or loaded in the buffer.
+ """
+ await self.client.send_command(
+ "player_queues/move_item",
+ queue_id=queue_id,
+ queue_item_id=queue_item_id,
+ pos_shift=pos_shift,
+ )
+
+ async def queue_command_move_up(self, queue_id: str, queue_item_id: str) -> None:
+ """Move given queue item one place up in the queue."""
+ await self.queue_command_move_item(
+ queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=-1
+ )
+
+ async def queue_command_move_down(self, queue_id: str, queue_item_id: str) -> None:
+ """Move given queue item one place down in the queue."""
+ await self.queue_command_move_item(
+ queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=1
+ )
+
+ async def queue_command_move_next(self, queue_id: str, queue_item_id: str) -> None:
+ """Move given queue item as next up in the queue."""
+ await self.queue_command_move_item(
+ queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=0
+ )
+
+ async def queue_command_delete(self, queue_id: str, item_id_or_index: int | str) -> None:
+ """Delete item (by id or index) from the queue."""
+ await self.client.send_command(
+ "player_queues/delete_item", queue_id=queue_id, item_id_or_index=item_id_or_index
+ )
+
+ async def queue_command_seek(self, queue_id: str, position: int) -> None:
+ """
+ Handle SEEK command for given queue.
+
+ Parameters:
+ - position: position in seconds to seek to in the current playing item.
+ """
+ await self.client.send_command("player_queues/seek", queue_id=queue_id, position=position)
+
+ async def queue_command_skip(self, queue_id: str, seconds: int) -> None:
+ """
+ Handle SKIP command for given queue.
+
+ Parameters:
+ - seconds: number of seconds to skip in track. Use negative value to skip back.
+ """
+ await self.client.send_command("player_queues/skip", queue_id=queue_id, seconds=seconds)
+
+ async def queue_command_shuffle(self, queue_id: str, shuffle_enabled=bool) -> None:
+ """Configure shuffle mode on the the queue."""
+ await self.client.send_command(
+ "player_queues/shuffle", queue_id=queue_id, shuffle_enabled=shuffle_enabled
+ )
+
+ async def queue_command_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
+ """Configure repeat mode on the the queue."""
+ await self.client.send_command(
+ "player_queues/repeat", queue_id=queue_id, repeat_mode=repeat_mode
+ )
+
+ async def play_index(
+ self,
+ queue_id: str,
+ index: int | str,
+ seek_position: int = 0,
+ fade_in: bool = False,
+ ) -> None:
+ """Play item at index (or item_id) X in queue."""
+ await self.client.send_command(
+ "player_queues/repeat",
+ queue_id=queue_id,
+ index=index,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ )
+
+ async def play_media(
+ self,
+ queue_id: str,
+ media: MediaItemType | list[MediaItemType] | str | list[str],
+ option: QueueOption | None = None,
+ radio_mode: bool = False,
+ start_item: str | None = None,
+ ) -> None:
+ """
+ Play media item(s) on the given queue.
+
+ - media: Media that should be played (MediaItem(s) or uri's).
+ - queue_opt: Which enqueue mode to use.
+ - radio_mode: Enable radio mode for the given item(s).
+ - start_item: Optional item to start the playlist or album from.
+ """
+ await self.client.send_command(
+ "player_queues/play_media",
+ queue_id=queue_id,
+ media=media,
+ option=option,
+ radio_mode=radio_mode,
+ start_item=start_item,
+ )
+
+ # Other endpoints/commands
+
+ async def _get_player_queues(self) -> list[PlayerQueue]:
+ """Fetch all PlayerQueues from the server."""
+ return [
+ PlayerQueue.from_dict(item)
+ for item in await self.client.send_command("player_queues/all")
+ ]
+
+ async def fetch_state(self) -> None:
+ """Fetch initial state once the server is connected."""
+ for queue in await self._get_player_queues():
+ self._queues[queue.queue_id] = queue
+
+ def _handle_event(self, event: MassEvent) -> None:
+ """Handle incoming player(queue) event."""
+ if event.event in (EventType.QUEUE_ADDED, EventType.QUEUE_UPDATED):
+ self._queues[event.object_id] = PlayerQueue.from_dict(event.data)
from typing import TYPE_CHECKING
-from music_assistant.common.models.enums import EventType, QueueOption, RepeatMode
+from music_assistant.common.models.enums import EventType
from music_assistant.common.models.player import Player
-from music_assistant.common.models.player_queue import PlayerQueue
-from music_assistant.common.models.queue_item import QueueItem
if TYPE_CHECKING:
from collections.abc import Iterator
from music_assistant.common.models.event import MassEvent
- from music_assistant.common.models.media_items import MediaItemType
from .client import MusicAssistantClient
EventType.PLAYER_ADDED,
EventType.PLAYER_REMOVED,
EventType.PLAYER_UPDATED,
- EventType.QUEUE_ADDED,
- EventType.QUEUE_UPDATED,
),
)
- # below items are retrieved after connect
+ # the initial items are retrieved after connect
self._players: dict[str, Player] = {}
- self._queues: dict[str, PlayerQueue] = {}
@property
def players(self) -> list[Player]:
"""Return all players."""
return list(self._players.values())
- @property
- def player_queues(self) -> list[PlayerQueue]:
- """Return all player queues."""
- return list(self._queues.values())
-
def __iter__(self) -> Iterator[Player]:
"""Iterate over (available) players."""
return iter(self._players.values())
- def get_player(self, player_id: str) -> Player | None:
+ def get(self, player_id: str) -> Player | None:
"""Return Player by ID (or None if not found)."""
return self._players.get(player_id)
- def get_player_queue(self, queue_id: str) -> PlayerQueue | None:
- """Return PlayerQueue by ID (or None if not found)."""
- return self._queues.get(queue_id)
-
# Player related endpoints/commands
- async def get_players(self) -> list[Player]:
- """Fetch all Players from the server."""
- return [Player.from_dict(item) for item in await self.client.send_command("players/all")]
-
async def player_command_stop(self, player_id: str) -> None:
"""Send STOP command to given player (directly)."""
await self.client.send_command("players/cmd/stop", player_id=player_id)
+ async def player_command_play(self, player_id: str) -> None:
+ """Send PLAY command to given player (directly)."""
+ await self.client.send_command("players/cmd/play", player_id=player_id)
+
+ async def player_command_pause(self, player_id: str) -> None:
+ """Send PAUSE command to given player (directly)."""
+ await self.client.send_command("players/cmd/pause", player_id=player_id)
+
+ async def player_command_play_pause(self, player_id: str) -> None:
+ """Send PLAY_PAUSE (toggle) command to given player (directly)."""
+ await self.client.send_command("players/cmd/pause", player_id=player_id)
+
async def player_command_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player."""
await self.client.send_command("players/cmd/power", player_id=player_id, powered=powered)
"""Send VOLUME MUTE command to given player."""
await self.client.send_command("players/cmd/volume_mute", player_id=player_id, muted=muted)
+ async def player_command_seek(self, player_id: str, position: int) -> None:
+ """Handle SEEK command for given player (directly).
+
+ - player_id: player_id of the player to handle the command.
+ - position: position in seconds to seek to in the current playing item.
+ """
+ await self.client.send_command("players/cmd/seek", player_id=player_id, position=position)
+
async def player_command_sync(self, player_id: str, target_player: str) -> None:
"""
Handle SYNC command for given player.
"""
await self.client.send_command("players/cmd/unsync", player_id=player_id)
+ async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
+ """Create temporary sync group by joining given players to target player."""
+ await self.client.send_command(
+ "players/cmd/sync_many", target_player=target_player, child_player_ids=child_player_ids
+ )
+
+ async def cmd_unsync_many(self, player_ids: list[str]) -> None:
+ """Create temporary sync group by joining given players to target player."""
+ await self.client.send_command("players/cmd/unsync_many", player_ids)
+
async def play_announcement(
self,
player_id: str,
# PlayerGroup related endpoints/commands
+ async def create_group(self, provider: str, name: str, members: list[str]) -> Player:
+ """Create new (permanent) Player/Sync Group on given PlayerProvider with name and members.
+
+ - provider: provider domain or instance id to create the new group on.
+ - name: Name for the new group to create.
+ - members: A list of player_id's that should be part of this group.
+
+ Returns the newly created player on success.
+ NOTE: Fails if the given provider does not support creating new groups
+ or members are given that can not be handled by the provider.
+ """
+ return Player.from_dict(
+ await self.client.send_command(
+ "players/create_group", provider=provider, name=name, members=members
+ )
+ )
+
async def set_player_group_volume(self, player_id: str, volume_level: int) -> None:
"""
Send VOLUME_SET command to given playergroup.
"players/cmd/group_volume", player_id=player_id, volume_level=volume_level
)
+ async def set_player_group_power(self, player_id: str, power: bool) -> None:
+ """Handle power command for a (Sync)Group."""
+ await self.client.send_command("players/cmd/group_volume", player_id=player_id, power=power)
+
async def set_player_group_members(self, player_id: str, members: list[str]) -> None:
"""
Update the memberlist of the given PlayerGroup.
"players/cmd/set_members", player_id=player_id, members=members
)
- # PlayerQueue related endpoints/commands
-
- async def get_player_queues(self) -> list[PlayerQueue]:
- """Fetch all PlayerQueues from the server."""
- return [
- PlayerQueue.from_dict(item)
- for item in await self.client.send_command("players/queue/all")
- ]
-
- async def get_player_queue_items(self, queue_id: str) -> list[QueueItem]:
- """Get all QueueItems for given PlayerQueue."""
- return [
- QueueItem.from_dict(item)
- for item in await self.client.send_command("players/queue/items", queue_id=queue_id)
- ]
-
- async def queue_command_play(self, queue_id: str) -> None:
- """Send PLAY command to given queue."""
- await self.client.send_command("players/queue/play", queue_id=queue_id)
-
- async def queue_command_pause(self, queue_id: str) -> None:
- """Send PAUSE command to given queue."""
- await self.client.send_command("players/queue/pause", queue_id=queue_id)
-
- async def queue_command_stop(self, queue_id: str) -> None:
- """Send STOP command to given queue."""
- await self.client.send_command("players/queue/stop", queue_id=queue_id)
-
- async def queue_command_next(self, queue_id: str) -> None:
- """Send NEXT TRACK command to given queue."""
- await self.client.send_command("players/queue/next", queue_id=queue_id)
-
- async def queue_command_previous(self, queue_id: str) -> None:
- """Send PREVIOUS TRACK command to given queue."""
- await self.client.send_command("players/queue/previous", queue_id=queue_id)
-
- async def queue_command_clear(self, queue_id: str) -> None:
- """Send CLEAR QUEUE command to given queue."""
- await self.client.send_command("players/queue/clear", queue_id=queue_id)
-
- async def queue_command_move_item(
- self, queue_id: str, queue_item_id: str, pos_shift: int = 1
- ) -> None:
- """
- Move queue item x up/down the queue.
-
- Parameters:
- - queue_id: id of the queue to process this request.
- - queue_item_id: the item_id of the queueitem that needs to be moved.
- - pos_shift: move item x positions down if positive value
- - pos_shift: move item x positions up if negative value
- - pos_shift: move item to top of queue as next item if 0
-
- NOTE: Fails if the given QueueItem is already player or loaded in the buffer.
- """
- await self.client.send_command(
- "players/queue/move_item",
- queue_id=queue_id,
- queue_item_id=queue_item_id,
- pos_shift=pos_shift,
- )
-
- async def queue_command_move_up(self, queue_id: str, queue_item_id: str) -> None:
- """Move given queue item one place up in the queue."""
- await self.queue_command_move_item(
- queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=-1
- )
-
- async def queue_command_move_down(self, queue_id: str, queue_item_id: str) -> None:
- """Move given queue item one place down in the queue."""
- await self.queue_command_move_item(
- queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=1
- )
-
- async def queue_command_move_next(self, queue_id: str, queue_item_id: str) -> None:
- """Move given queue item as next up in the queue."""
- await self.queue_command_move_item(
- queue_id=queue_id, queue_item_id=queue_item_id, pos_shift=0
- )
-
- async def queue_command_delete(self, queue_id: str, item_id_or_index: int | str) -> None:
- """Delete item (by id or index) from the queue."""
- await self.client.send_command(
- "players/queue/delete_item", queue_id=queue_id, item_id_or_index=item_id_or_index
- )
-
- async def queue_command_seek(self, queue_id: str, position: int) -> None:
- """
- Handle SEEK command for given queue.
-
- Parameters:
- - position: position in seconds to seek to in the current playing item.
- """
- await self.client.send_command("players/queue/seek", queue_id=queue_id, position=position)
-
- async def queue_command_skip(self, queue_id: str, seconds: int) -> None:
- """
- Handle SKIP command for given queue.
-
- Parameters:
- - seconds: number of seconds to skip in track. Use negative value to skip back.
- """
- await self.client.send_command("players/queue/skip", queue_id=queue_id, seconds=seconds)
-
- async def queue_command_shuffle(self, queue_id: str, shuffle_enabled=bool) -> None:
- """Configure shuffle mode on the the queue."""
- await self.client.send_command(
- "players/queue/shuffle", queue_id=queue_id, shuffle_enabled=shuffle_enabled
- )
-
- async def queue_command_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
- """Configure repeat mode on the the queue."""
- await self.client.send_command(
- "players/queue/repeat", queue_id=queue_id, repeat_mode=repeat_mode
- )
-
- async def play_media(
- self,
- queue_id: str,
- media: MediaItemType | list[MediaItemType] | str | list[str],
- option: QueueOption | None = None,
- radio_mode: bool = False,
- ) -> None:
- """
- Play media item(s) on the given queue.
-
- - media: Media that should be played (MediaItem(s) or uri's).
- - queue_opt: Which enqueue mode to use.
- - radio_mode: Enable radio mode for the given item(s).
- """
- await self.client.send_command(
- "players/queue/play_media",
- queue_id=queue_id,
- media=media,
- option=option,
- radio_mode=radio_mode,
- )
-
# Other endpoints/commands
+ async def _get_players(self) -> list[Player]:
+ """Fetch all Players from the server."""
+ return [Player.from_dict(item) for item in await self.client.send_command("players/all")]
+
async def fetch_state(self) -> None:
"""Fetch initial state once the server is connected."""
- for player in await self.get_players():
+ for player in await self._get_players():
self._players[player.player_id] = player
- for queue in await self.get_player_queues():
- self._queues[queue.queue_id] = queue
def _handle_event(self, event: MassEvent) -> None:
- """Handle incoming player(queue) event."""
+ """Handle incoming player event."""
if event.event in (EventType.PLAYER_ADDED, EventType.PLAYER_UPDATED):
self._players[event.object_id] = Player.from_dict(event.data)
return
if event.event == EventType.PLAYER_REMOVED:
self._players.pop(event.object_id, None)
- if event.event in (EventType.QUEUE_ADDED, EventType.QUEUE_UPDATED):
- self._queues[event.object_id] = PlayerQueue.from_dict(event.data)
result: Any = field(default=None, metadata={"serialize": lambda v: get_serializable_value(v)})
-@dataclass
-class ChunkedResultMessage(ResultMessageBase):
- """Message sent when the result of a command is sent in multiple chunks."""
-
- result: Any = field(default=None, metadata={"serialize": lambda v: get_serializable_value(v)})
- is_last_chunk: bool = False
-
-
@dataclass
class ErrorResultMessage(ResultMessageBase):
"""Message sent when a command did not execute successfully."""
return EventMessage.from_dict(raw)
if "error_code" in raw:
return ErrorResultMessage.from_dict(raw)
- if "result" in raw and "is_last_chunk" in raw:
- return ChunkedResultMessage.from_dict(raw)
if "result" in raw:
return SuccessResultMessage.from_dict(raw)
if "sdk_version" in raw:
QUEUE_UPDATED = "queue_updated"
QUEUE_ITEMS_UPDATED = "queue_items_updated"
QUEUE_TIME_UPDATED = "queue_time_updated"
- QUEUE_SETTINGS_UPDATED = "queue_settings_updated"
SHUTDOWN = "application_shutdown"
MEDIA_ITEM_ADDED = "media_item_added"
MEDIA_ITEM_UPDATED = "media_item_updated"
from collections.abc import Iterable
from dataclasses import dataclass, field, fields
-from typing import TYPE_CHECKING, Any, Self, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, cast
from mashumaro import DataClassDictMixin
from music_assistant.common.helpers.global_cache import get_global_cache_value
from music_assistant.common.helpers.uri import create_uri
-from music_assistant.common.helpers.util import create_sort_name, is_valid_uuid, merge_lists
+from music_assistant.common.helpers.util import (
+ create_sort_name,
+ is_valid_uuid,
+ merge_lists,
+)
from music_assistant.common.models.enums import (
AlbumType,
ContentType,
# having items for unavailable providers can have all sorts
# of unpredictable results so ensure we have accurate availability status
if not (available_providers := get_global_cache_value("unique_providers")):
- self.available = False
+ # this is probably the client
+ self.available = self.available
return
if TYPE_CHECKING:
available_providers = cast(set[str], available_providers)
elif isinstance(cur_val, set) and isinstance(new_val, set | list | tuple):
new_val = cur_val.update(new_val)
setattr(self, fld.name, new_val)
- elif new_val and fld.name in ("popularity", "last_refresh", "cache_checksum"):
+ elif new_val and fld.name in (
+ "popularity",
+ "last_refresh",
+ "cache_checksum",
+ ):
# some fields are always allowed to be overwritten
# (such as checksum and last_refresh)
setattr(self, fld.name, new_val)
artists: UniqueList[Artist | ItemMapping] = field(default_factory=UniqueList)
album_type: AlbumType = AlbumType.UNKNOWN
+ @property
+ def artist_str(self) -> str:
+ """Return (combined) artist string for track."""
+ return "/".join(x.name for x in self.artists)
+
@dataclass(kw_only=True)
class Track(MediaItem):
MediaItemType = Artist | Album | Track | Radio | Playlist | BrowseFolder
-@dataclass(kw_only=True)
-class PagedItems(DataClassDictMixin):
+class PagedItems(Generic[_T]):
"""Model for a paged listing."""
- items: list[MediaItemType]
- count: int
- limit: int
- offset: int
- total: int | None = None
+ def __init__(
+ self,
+ items: list[_T],
+ limit: int,
+ offset: int,
+ count: int | None = None,
+ total: int | None = None,
+ ):
+ """Initialize PagedItems."""
+ self.items = items
+ self.count = count = count or len(items)
+ self.limit = limit
+ self.offset = offset
+ self.total = total
+ if total is None and offset == 0 and count != limit:
+ self.total = count
+ if total is None and offset and count < limit:
+ self.total = offset + count
+
+ def to_dict(self, *args, **kwargs) -> dict[str, Any]:
+ """Return PagedItems as serializable dict."""
+ return {
+ "items": [x.to_dict() for x in self.items],
+ "count": self.count,
+ "limit": self.limit,
+ "offset": self.offset,
+ "total": self.total,
+ }
@classmethod
- def parse(cls, raw: dict[str, Any], item_type: type) -> PagedItems:
+ def parse(cls, raw: dict[str, Any], item_type: type[MediaItemType]) -> Self[MediaItemType]:
"""Parse PagedItems object including correct item type."""
return PagedItems(
items=[item_type.from_dict(x) for x in raw["items"]],
import asyncio
from dataclasses import dataclass, field
-from typing import Any, TypedDict
+from typing import Any
from mashumaro.mixins.orjson import DataClassORJSONMixin
return await load_json_file(manifest_file, ProviderManifest)
-class ProviderInstance(TypedDict):
- """Provider instance detailed dict when a provider is serialized over the api."""
+@dataclass
+class ProviderInstance(DataClassORJSONMixin):
+ """Provider instance details when a provider is serialized over the api."""
type: ProviderType
domain: str
instance_id: str
supported_features: list[ProviderFeature]
available: bool
- icon: str | None
+ icon: str | None = None
is_streaming_provider: bool | None = None # music providers only
@api_command("config/players/get")
async def get_player_config(self, player_id: str) -> PlayerConfig:
- """Return configuration for a single player."""
+ """Return (full) configuration for a single player."""
if raw_conf := self.get(f"{CONF_PLAYERS}/{player_id}"):
if prov := self.mass.get_provider(raw_conf["provider"]):
conf_entries = await prov.get_player_config_entries(player_id)
Album,
AlbumType,
Artist,
+ ItemMapping,
MediaType,
PagedItems,
Track,
query = f"WHERE {DB_TABLE_ALBUM_ARTISTS}.artist_id = {item_id}"
return await self.mass.music.albums._get_library_items_by_query(extra_query=query)
- async def _add_library_item(self, item: Artist) -> int:
+ async def _add_library_item(self, item: Artist | ItemMapping) -> int:
"""Add a new item record to the database."""
+ if isinstance(item, ItemMapping):
+ item = Artist.from_item_mapping(item)
# enforce various artists name + id
if compare_strings(item.name, VARIOUS_ARTISTS_NAME):
item.mbid = VARIOUS_ARTISTS_ID_MBID
return db_id
async def _update_library_item(
- self, item_id: str | int, update: Artist, overwrite: bool = False
+ self, item_id: str | int, update: Artist | ItemMapping, overwrite: bool = False
) -> None:
"""Update existing record in the database."""
db_id = int(item_id) # ensure integer
cur_item = await self.get_library_item(db_id)
- metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
+ if isinstance(update, ItemMapping):
+ # NOTE that artist is the only mediatype where its accepted we
+ # receive an itemmapping from streaming providers
+ update = Artist.from_item_mapping(update)
+ metadata = cur_item.metadata
+ else:
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
cur_item.external_ids.update(update.external_ids)
# enforce various artists name + id
mbid = cur_item.mbid
from music_assistant.common.helpers.json import json_loads, serialize_to_json
from music_assistant.common.models.enums import EventType, ExternalID, MediaType, ProviderFeature
-from music_assistant.common.models.errors import (
- InvalidDataError,
- MediaNotFoundError,
- ProviderUnavailableError,
-)
+from music_assistant.common.models.errors import MediaNotFoundError, ProviderUnavailableError
from music_assistant.common.models.media_items import (
Album,
ItemMapping,
break
if library_id is None:
# actually add a new item in the library db
- if not item.provider_mappings:
- msg = "Item is missing provider mapping(s)"
- raise InvalidDataError(msg)
async with self._db_add_lock:
library_id = await self._add_library_item(item)
new_item = True
order_by: str = "sort_name",
extra_query: str | None = None,
extra_query_params: dict[str, Any] | None = None,
- ) -> PagedItems:
+ ) -> PagedItems[ItemCls]:
"""Get in-database items."""
items = await self._get_library_items_by_query(
favorite=favorite,
extra_query_params=extra_query_params,
count_only=True,
)
- return PagedItems(items=items, count=count, limit=limit, offset=offset, total=total)
+ return PagedItems(items=items, limit=limit, offset=offset, count=count, total=total)
async def iter_library_items(
self,
from __future__ import annotations
import random
-from collections.abc import AsyncGenerator
-from typing import TYPE_CHECKING, Any, cast
+from typing import Any
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.helpers.uri import create_uri, parse_uri
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import Playlist, PlaylistTrack, Track
+from music_assistant.common.models.media_items import PagedItems, Playlist, PlaylistTrack, Track
from music_assistant.constants import DB_TABLE_PLAYLISTS
from music_assistant.server.models.music_provider import MusicProvider
item_id: str,
provider_instance_id_or_domain: str,
force_refresh: bool = False,
- ) -> AsyncGenerator[PlaylistTrack, None]:
+ offset: int = 0,
+ limit: int = 50,
+ ) -> PagedItems[PlaylistTrack]:
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(
item_id,
force_refresh=force_refresh,
)
prov = next(x for x in playlist.provider_mappings)
- count = 0
- async for track in self._get_provider_playlist_tracks(
+ tracks = await self._get_provider_playlist_tracks(
prov.item_id,
prov.provider_instance,
cache_checksum=playlist.metadata.cache_checksum,
- ):
- count += 1
- yield track
- if count == 2500:
- self.logger.warning(
- "Playlist %s has more than 2500 tracks - this will hurt performance!",
- playlist.name,
- )
+ offset=offset,
+ limit=limit,
+ )
+ return PagedItems(items=tracks, limit=limit, offset=offset)
async def create_playlist(
self, name: str, provider_instance_or_domain: str | None = None
item_id: str,
provider_instance_id_or_domain: str,
cache_checksum: Any = None,
- ) -> AsyncGenerator[PlaylistTrack, None]:
- """Return album tracks for the given provider album id."""
+ offset: int = 0,
+ limit: int = 100,
+ ) -> list[PlaylistTrack]:
+ """Return playlist tracks for the given provider playlist id."""
assert provider_instance_id_or_domain != "library"
provider: MusicProvider = self.mass.get_provider(provider_instance_id_or_domain)
if not provider:
- return
+ return []
# prefer cache items (if any)
- cache_key = f"{provider.lookup_key}.playlist.{item_id}.tracks"
+ cache_key = f"{provider.lookup_key}.playlist.{item_id}.tracks.{offset}.{limit}"
if (cache := await self.mass.cache.get(cache_key, checksum=cache_checksum)) is not None:
- for track_dict in cache:
- yield PlaylistTrack.from_dict(track_dict)
- return
+ return [PlaylistTrack.from_dict(x) for x in cache]
# no items in cache - get listing from provider
- all_items = []
- async for item in provider.get_playlist_tracks(item_id):
+ result: list[Track] = []
+ for item in await provider.get_playlist_tracks(item_id, offset=offset, limit=limit):
# double check if position set
assert item.position is not None, "Playlist items require position to be set"
- yield cast(PlaylistTrack, item) if TYPE_CHECKING else item
- all_items.append(item)
+ result.append(item)
# if this is a complete track object, pre-cache it as
# that will save us an (expensive) lookup later
if item.image and item.artist_str and item.album and provider.domain != "builtin":
if cache_checksum != "no_cache":
self.mass.create_task(
self.mass.cache.set(
- cache_key, [x.to_dict() for x in all_items], checksum=cache_checksum
+ cache_key, [x.to_dict() for x in result], checksum=cache_checksum
)
)
+ return result
async def _get_provider_dynamic_tracks(
self,
playlist_genres: dict[str, int] = {}
# retrieve metedata for the playlist from the tracks (such as genres etc.)
# TODO: retrieve style/mood ?
- async for track in self.mass.music.playlists.tracks(playlist.item_id, playlist.provider):
+ playlist_items = await self.mass.music.playlists.tracks(playlist.item_id, playlist.provider)
+ for track in playlist_items.items:
if track.image:
all_playlist_tracks_images.add(track.image)
if track.metadata.genres:
# create collage images
cur_images = playlist.metadata.images or []
new_images = []
+ # thumb image
thumb_image = next((x for x in cur_images if x.type == ImageType.THUMB), None)
if not thumb_image or self._collage_images_dir in thumb_image.path:
thumb_image_path = (
all_playlist_tracks_images, thumb_image_path
):
new_images.append(collage_thumb_image)
- elif thumb_image:
- # just use old image
- new_images.append(thumb_image)
+ elif thumb_image:
+ # just use old image
+ new_images.append(thumb_image)
+ # fanart image
fanart_image = next((x for x in cur_images if x.type == ImageType.FANART), None)
if not fanart_image or self._collage_images_dir in fanart_image.path:
fanart_image_path = (
all_playlist_tracks_images, fanart_image_path, fanart=True
):
new_images.append(collage_fanart_image)
- elif fanart_image:
- # just use old image
- new_images.append(fanart_image)
+ elif fanart_image:
+ # just use old image
+ new_images.append(fanart_image)
playlist.metadata.images = new_images
# set timestamp, used to determine when this function was last called
playlist.metadata.last_refresh = int(time())
import logging
import os
import shutil
-from collections.abc import AsyncGenerator
from contextlib import suppress
from itertools import zip_longest
from typing import TYPE_CHECKING
MusicAssistantError,
ProviderUnavailableError,
)
-from music_assistant.common.models.media_items import BrowseFolder, MediaItemType, SearchResults
+from music_assistant.common.models.media_items import (
+ BrowseFolder,
+ MediaItemType,
+ PagedItems,
+ SearchResults,
+)
from music_assistant.common.models.provider import SyncTask
from music_assistant.common.models.streamdetails import LoudnessMeasurement
from music_assistant.constants import (
return result
@api_command("music/browse")
- async def browse(self, path: str | None = None) -> AsyncGenerator[MediaItemType, None]:
+ async def browse(
+ self, offset: int, limit: int, path: str | None = None
+ ) -> PagedItems[MediaItemType]:
"""Browse Music providers."""
if not path or path == "root":
# root level; folder per provider
+ root_items: list[MediaItemType] = []
for prov in self.providers:
if ProviderFeature.BROWSE not in prov.supported_features:
continue
- yield BrowseFolder(
- item_id="root",
- provider=prov.domain,
- path=f"{prov.instance_id}://",
- uri=f"{prov.instance_id}://",
- name=prov.name,
+ root_items.append(
+ BrowseFolder(
+ item_id="root",
+ provider=prov.domain,
+ path=f"{prov.instance_id}://",
+ uri=f"{prov.instance_id}://",
+ name=prov.name,
+ )
)
- return
+ return PagedItems(items=root_items, limit=limit, offset=offset)
# provider level
+ prepend_items: list[MediaItemType] = []
provider_instance, sub_path = path.split("://", 1)
prov = self.mass.get_provider(provider_instance)
# handle regular provider listing, always add back folder first
if not prov or not sub_path:
- yield BrowseFolder(item_id="root", provider="library", path="root", name="..")
+ prepend_items.append(
+ BrowseFolder(item_id="root", provider="library", path="root", name="..")
+ )
if not prov:
- return
+ return PagedItems(items=prepend_items, limit=limit, offset=offset)
else:
back_path = f"{provider_instance}://" + "/".join(sub_path.split("/")[:-1])
- yield BrowseFolder(
- item_id="back", provider=provider_instance, path=back_path, name=".."
+ prepend_items.append(
+ BrowseFolder(item_id="back", provider=provider_instance, path=back_path, name="..")
)
- async for item in prov.browse(path):
- yield item
+ prov_items = await prov.browse(path, offset, limit)
+ prov_items.items = prepend_items + prov_items.items
+ return prov_items
@api_command("music/recently_played_items")
async def recently_played(
f"UPDATE {ctrl.db_table} SET play_count = play_count + 1, "
f"last_played = {timestamp} WHERE item_id = {item_id}"
)
- await self._db.commit()
+ await self.database.commit()
def get_controller(
self, media_type: MediaType
import asyncio
import random
import time
-from collections.abc import AsyncGenerator
from contextlib import suppress
from typing import TYPE_CHECKING, Any, TypedDict
PlayerUnavailableError,
QueueEmpty,
)
-from music_assistant.common.models.media_items import AlbumTrack, MediaItemType, media_from_dict
+from music_assistant.common.models.media_items import (
+ AlbumTrack,
+ MediaItemType,
+ PagedItems,
+ media_from_dict,
+)
from music_assistant.common.models.player import PlayerMedia
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
"""Iterate over (available) players."""
return iter(self._queues.values())
- @api_command("players/queue/all")
+ @api_command("player_queues/all")
def all(self) -> tuple[PlayerQueue, ...]:
"""Return all registered PlayerQueues."""
return tuple(self._queues.values())
- @api_command("players/queue/get")
+ @api_command("player_queues/get")
def get(self, queue_id: str) -> PlayerQueue | None:
"""Return PlayerQueue by queue_id or None if not found."""
return self._queues.get(queue_id)
- @api_command("players/queue/items")
- async def items(self, queue_id: str) -> AsyncGenerator[QueueItem, None]:
+ @api_command("player_queues/items")
+ def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> PagedItems[QueueItem]:
"""Return all QueueItems for given PlayerQueue."""
- # because the QueueItems can potentially be a very large list, this is a async generator
- for index, queue_item in enumerate(self._queue_items.get(queue_id, [])):
- queue_item.index = index
- yield queue_item
+ if queue_id not in self._queue_items:
+ return PagedItems(items=[], limit=limit, offset=offset)
+
+ return PagedItems(
+ items=self._queue_items[queue_id][offset:limit],
+ limit=limit,
+ offset=offset,
+ total=len(self._queue_items[queue_id]),
+ )
- @api_command("players/queue/get_active_queue")
+ @api_command("player_queues/get_active_queue")
def get_active_queue(self, player_id: str) -> PlayerQueue:
"""Return the current active/synced queue for a player."""
if player := self.mass.players.get(player_id):
# Queue commands
- @api_command("players/queue/shuffle")
+ @api_command("player_queues/shuffle")
def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
"""Configure shuffle setting on the the queue."""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
shuffle=shuffle_enabled,
)
- @api_command("players/queue/repeat")
+ @api_command("player_queues/repeat")
def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
"""Configure repeat setting on the the queue."""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
queue.repeat_mode = repeat_mode
self.signal_update(queue_id)
- @api_command("players/queue/play_media")
+ @api_command("player_queues/play_media")
async def play_media(
self,
queue_id: str,
queue.items = len(queue_items)
self.signal_update(queue_id)
- @api_command("players/queue/move_item")
+ @api_command("player_queues/move_item")
def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
"""
Move queue item x up/down the queue.
queue_items.insert(new_index, queue_items.pop(item_index))
self.update_items(queue_id, queue_items)
- @api_command("players/queue/delete_item")
+ @api_command("player_queues/delete_item")
def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
"""Delete item (by id or index) from the queue."""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
queue_items.pop(item_index)
self.update_items(queue_id, queue_items)
- @api_command("players/queue/clear")
+ @api_command("player_queues/clear")
def clear(self, queue_id: str) -> None:
"""Clear all items in the queue."""
if (player := self.mass.players.get(queue_id)) and player.announcement_in_progress:
queue.index_in_buffer = None
self.update_items(queue_id, [])
- @api_command("players/queue/stop")
+ @api_command("player_queues/stop")
async def stop(self, queue_id: str) -> None:
"""
Handle STOP command for given queue.
# simply forward the command to underlying player
await self.mass.players.cmd_stop(queue_id)
- @api_command("players/queue/play")
+ @api_command("player_queues/play")
async def play(self, queue_id: str) -> None:
"""
Handle PLAY command for given queue.
else:
await self.resume(queue_id)
- @api_command("players/queue/pause")
+ @api_command("player_queues/pause")
async def pause(self, queue_id: str) -> None:
"""Handle PAUSE command for given queue.
# simply forward the command to underlying player
await self.mass.players.cmd_pause(queue_id)
- @api_command("players/queue/play_pause")
+ @api_command("player_queues/play_pause")
async def play_pause(self, queue_id: str) -> None:
"""Toggle play/pause on given playerqueue.
return
await self.play(queue_id)
- @api_command("players/queue/next")
+ @api_command("player_queues/next")
async def next(self, queue_id: str) -> None:
"""Handle NEXT TRACK command for given queue.
if (next_index := self._get_next_index(queue_id, current_index, True)) is not None:
await self.play_index(queue_id, next_index)
- @api_command("players/queue/previous")
+ @api_command("player_queues/previous")
async def previous(self, queue_id: str) -> None:
"""Handle PREVIOUS TRACK command for given queue.
return
await self.play_index(queue_id, max(current_index - 1, 0))
- @api_command("players/queue/skip")
+ @api_command("player_queues/skip")
async def skip(self, queue_id: str, seconds: int = 10) -> None:
"""Handle SKIP command for given queue.
return
await self.seek(queue_id, self._queues[queue_id].elapsed_time + seconds)
- @api_command("players/queue/seek")
+ @api_command("player_queues/seek")
async def seek(self, queue_id: str, position: int = 10) -> None:
"""Handle SEEK command for given queue.
assert position < queue.current_item.duration
await self.play_index(queue_id, queue.current_index, position)
- @api_command("players/queue/resume")
+ @api_command("player_queues/resume")
async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
"""Handle RESUME command for given queue.
msg = f"Resume queue requested but queue {queue_id} is empty"
raise QueueEmpty(msg)
- @api_command("players/queue/play_index")
+ @api_command("player_queues/play_index")
async def play_index(
self,
queue_id: str,
ProviderUnavailableError,
UnsupportedFeaturedException,
)
+from music_assistant.common.models.media_items import UniqueList
from music_assistant.common.models.player import Player, PlayerMedia
from music_assistant.constants import (
CONF_AUTO_PLAY,
"""Return Player by name or None if no match is found."""
return next((x for x in self._players.values() if x.name == name), None)
- @api_command("players/set")
- def set(self, player: Player) -> None:
- """Set/Update player details on the controller."""
- if player.player_id not in self._players:
- # new player
- self.register(player)
- return
- self._players[player.player_id] = player
- self.update(player.player_id)
-
- @api_command("players/register")
- def register(self, player: Player) -> None:
- """Register a new player on the controller."""
- if self.mass.closing:
- return
- player_id = player.player_id
-
- if player_id in self._players:
- msg = f"Player {player_id} is already registered"
- raise AlreadyRegisteredError(msg)
-
- # make sure that the player's provider is set to the instance id
- if prov := self.mass.get_provider(player.provider):
- player.provider = prov.instance_id
- else:
- raise RuntimeError("Invalid provider ID given: %s", player.provider)
-
- # make sure a default config exists
- self.mass.config.create_default_player_config(
- player_id, player.provider, player.name, player.enabled_by_default
- )
-
- player.enabled = self.mass.config.get(f"{CONF_PLAYERS}/{player_id}/enabled", True)
-
- # register playerqueue for this player
- self.mass.create_task(self.mass.player_queues.on_player_register(player))
-
- self._players[player_id] = player
-
- # ignore disabled players
- if not player.enabled:
- return
-
- # initialize sync groups as soon as a player is registered
- self.mass.loop.create_task(self._register_syncgroups())
-
- self.logger.info(
- "Player registered: %s/%s",
- player_id,
- player.name,
- )
- self.mass.signal_event(EventType.PLAYER_ADDED, object_id=player.player_id, data=player)
- # always call update to fix special attributes like display name, group volume etc.
- self.update(player.player_id)
-
- @api_command("players/register_or_update")
- def register_or_update(self, player: Player) -> None:
- """Register a new player on the controller or update existing one."""
- if self.mass.closing:
- return
-
- if player.player_id in self._players:
- self._players[player.player_id] = player
- self.update(player.player_id)
- return
-
- self.register(player)
-
- @api_command("players/remove")
- def remove(self, player_id: str, cleanup_config: bool = True) -> None:
- """Remove a player from the registry."""
- player = self._players.pop(player_id, None)
- if player is None:
- return
- self.logger.info("Player removed: %s", player.name)
- self.mass.player_queues.on_player_remove(player_id)
- if cleanup_config:
- self.mass.config.remove(f"players/{player_id}")
- self._prev_states.pop(player_id, None)
- self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
-
- @api_command("players/update")
- def update(
- self, player_id: str, skip_forward: bool = False, force_update: bool = False
- ) -> None:
- """Update player state."""
- if self.mass.closing:
- return
- if player_id not in self._players:
- return
- player = self._players[player_id]
- # calculate active group and active source
- player.active_group = self._get_active_player_group(player)
- player.active_source = self._get_active_source(player)
- player.volume_level = player.volume_level or 0 # guard for None volume
- # calculate group volume
- player.group_volume = self._get_group_volume_level(player)
- if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
- player.volume_level = player.group_volume
- # prefer any overridden name from config
- player.display_name = (
- self.mass.config.get_raw_player_config_value(player.player_id, "name")
- or player.name
- or player.player_id
- )
- if (
- not player.powered
- and player.state == PlayerState.PLAYING
- and PlayerFeature.POWER not in player.supported_features
- and player.active_source == player_id
- ):
- # mark player as powered if its playing
- # could happen for players that do not officially support power commands
- player.powered = True
- player.hidden = self.mass.config.get_raw_player_config_value(
- player.player_id, CONF_HIDE_PLAYER, False
- )
- player.icon = self.mass.config.get_raw_player_config_value(
- player.player_id,
- CONF_ENTRY_PLAYER_ICON.key,
- CONF_ENTRY_PLAYER_ICON_GROUP.default_value
- if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP)
- else CONF_ENTRY_PLAYER_ICON.default_value,
- )
- # handle syncgroup - get attributes from first player that has this group as source
- if player.player_id.startswith(SYNCGROUP_PREFIX):
- if player.powered and (sync_leader := self.get_sync_leader(player)):
- player.state = sync_leader.state
- player.current_item_id = sync_leader.current_item_id
- player.elapsed_time = sync_leader.elapsed_time
- player.elapsed_time_last_updated = sync_leader.elapsed_time_last_updated
- else:
- player.state = PlayerState.IDLE
-
- # basic throttle: do not send state changed events if player did not actually change
- prev_state = self._prev_states.get(player_id, {})
- new_state = self._players[player_id].to_dict()
- changed_values = get_changed_values(
- prev_state,
- new_state,
- ignore_keys=["elapsed_time", "elapsed_time_last_updated", "seq_no"],
- )
- self._prev_states[player_id] = new_state
-
- if not player.enabled and not force_update:
- # ignore updates for disabled players
- return
-
- # always signal update to the playerqueue
- self.mass.player_queues.on_player_update(player, changed_values)
-
- if len(changed_values) == 0 and not force_update:
- return
-
- self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
-
- if skip_forward:
- return
- # update/signal group player(s) child's when group updates
- if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
- for child_player in self.iter_group_members(player):
- if child_player.player_id == player.player_id:
- continue
- self.update(child_player.player_id, skip_forward=True)
- # update/signal group player(s) when child updates
- for group_player in self._get_player_groups(player, powered_only=False):
- player_prov = self.get_player_provider(group_player.player_id)
- if not player_prov:
- continue
- if group_player.player_id.startswith(SYNCGROUP_PREFIX):
- self.update(group_player.player_id, skip_forward=True)
- else:
- self.mass.create_task(player_prov.poll_player(group_player.player_id))
-
- def get_player_provider(self, player_id: str) -> PlayerProvider:
- """Return PlayerProvider for given player."""
- player = self._players[player_id]
- player_provider = self.mass.get_provider(player.provider)
- return cast(PlayerProvider, player_provider)
-
# Player commands
@api_command("players/cmd/stop")
@api_command("players/cmd/seek")
async def cmd_seek(self, player_id: str, position: int) -> None:
- """Handle SEEK command for given queue.
+ """Handle SEEK command for given player (directly).
- player_id: player_id of the player to handle the command.
- position: position in seconds to seek to in the current playing item.
finally:
player.announcement_in_progress = False
- @api_command("players/cmd/play_media")
async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player.
media=media,
)
- @api_command("players/cmd/enqueue_next_media")
async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of a next media item on the player."""
if player_id.startswith(SYNCGROUP_PREFIX):
- player_id: player_id of the player to handle the command.
- target_player: player_id of the syncgroup leader or group player.
"""
- child_player = self.get(player_id, True)
- parent_player = self.get(target_player, True)
- assert child_player
- assert parent_player
- if PlayerFeature.SYNC not in child_player.supported_features:
- msg = f"Player {child_player.name} does not support (un)sync commands"
- raise UnsupportedFeaturedException(msg)
- if PlayerFeature.SYNC not in parent_player.supported_features:
- msg = f"Player {parent_player.name} does not support (un)sync commands"
- raise UnsupportedFeaturedException(msg)
- if player_id == target_player:
- return
- if child_player.synced_to:
- if child_player.synced_to == parent_player.player_id:
- # nothing to do: already synced to this parent
- return
- # player already synced, unsync first
- await self.cmd_unsync(child_player.player_id)
- elif child_player.state == PlayerState.PLAYING:
- # stop child player if it is currently playing
- await self.cmd_stop(player_id)
- if player_id not in parent_player.can_sync_with:
- raise RuntimeError(
- f"Player {child_player.display_name} can not "
- f"be synced with {parent_player.display_name}",
- )
- # all checks passed, forward command to the player provider
- player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_sync(player_id, target_player)
+ await self.cmd_sync_many(target_player, [player_id])
@api_command("players/cmd/unsync")
@handle_player_command
- player_id: player_id of the player to handle the command.
"""
- player = self.get(player_id, True)
- if PlayerFeature.SYNC not in player.supported_features:
- msg = f"Player {player.name} does not support syncing"
+ await self.cmd_unsync_many([player_id])
+
+ @api_command("players/cmd/sync_many")
+ @handle_player_command
+ async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
+ """Create temporary sync group by joining given players to target player."""
+ parent_player: Player = self.get(target_player, True)
+ if PlayerFeature.SYNC not in parent_player.supported_features:
+ msg = f"Player {parent_player.name} does not support (un)sync commands"
raise UnsupportedFeaturedException(msg)
- if not player.synced_to:
- self.logger.info(
- "Ignoring command to unsync player %s "
- "because it is currently not synced to another player.",
- player.display_name,
- )
+ # filter all player ids on compatibility and availability
+ final_player_ids: UniqueList[str] = UniqueList()
+ for child_player_id in child_player_ids:
+ if child_player_id == target_player:
+ continue
+ if not (child_player := self.get(child_player_id)):
+ self.logger.warning("Player %s is not available", child_player_id)
+ continue
+ if PlayerFeature.SYNC not in child_player.supported_features:
+ self.logger.warning(
+ "Player %s does not support (un)sync commands", child_player.name
+ )
+ continue
+ if child_player.synced_to and child_player.synced_to != target_player:
+ # player already synced to another player, unsync first
+ self.logger.warning(
+ "Player %s is already synced, unsyncing first", child_player.name
+ )
+ await self.cmd_unsync(child_player.player_id)
+
+ if child_player_id not in parent_player.can_sync_with:
+ self.logger.warning(
+ "Player %s can not be synced with %s",
+ child_player.display_name,
+ parent_player.display_name,
+ )
+ continue
+ # if we reach here, all checks passed
+ final_player_ids.add(child_player_id)
+
+ # forward command to the player provider after all (base) sanity checks
+ player_provider = self.get_player_provider(target_player)
+ await player_provider.cmd_sync_many(target_player, child_player_ids)
+
+ @api_command("players/cmd/unsync_many")
+ @handle_player_command
+ async def cmd_unsync_many(self, player_ids: list[str]) -> None:
+ """Handle UNSYNC command for all the given players.
+
+ Remove the given player from any syncgroups it currently is synced to.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ # filter all player ids on compatibility and availability
+ final_player_ids: UniqueList[str] = UniqueList()
+ for player_id in player_ids:
+ if not (child_player := self.get(player_id)):
+ self.logger.warning("Player %s is not available", player_id)
+ continue
+ if PlayerFeature.SYNC not in child_player.supported_features:
+ self.logger.warning(
+ "Player %s does not support (un)sync commands", child_player.name
+ )
+ continue
+ final_player_ids.append(player_id)
+
+ if not final_player_ids:
return
- # all checks passed, forward command to the player provider
- player_provider = self.get_player_provider(player_id)
- await player_provider.cmd_unsync(player_id)
- # reset active_source just in case
- player.active_source = None
+ # forward command to the player provider after all (base) sanity checks
+ player_provider = self.get_player_provider(final_player_ids[0])
+ await player_provider.cmd_unsync_many(final_player_ids)
@api_command("players/create_group")
async def create_group(self, provider: str, name: str, members: list[str]) -> Player:
msg = f"Provider {player_prov.name} does not support creating groups"
raise UnsupportedFeaturedException(msg)
+ def set(self, player: Player) -> None:
+ """Set/Update player details on the controller."""
+ if player.player_id not in self._players:
+ # new player
+ self.register(player)
+ return
+ self._players[player.player_id] = player
+ self.update(player.player_id)
+
+ def register(self, player: Player) -> None:
+ """Register a new player on the controller."""
+ if self.mass.closing:
+ return
+ player_id = player.player_id
+
+ if player_id in self._players:
+ msg = f"Player {player_id} is already registered"
+ raise AlreadyRegisteredError(msg)
+
+ # make sure that the player's provider is set to the instance id
+ if prov := self.mass.get_provider(player.provider):
+ player.provider = prov.instance_id
+ else:
+ raise RuntimeError("Invalid provider ID given: %s", player.provider)
+
+ # make sure a default config exists
+ self.mass.config.create_default_player_config(
+ player_id, player.provider, player.name, player.enabled_by_default
+ )
+
+ player.enabled = self.mass.config.get(f"{CONF_PLAYERS}/{player_id}/enabled", True)
+
+ # register playerqueue for this player
+ self.mass.create_task(self.mass.player_queues.on_player_register(player))
+
+ self._players[player_id] = player
+
+ # ignore disabled players
+ if not player.enabled:
+ return
+
+ # initialize sync groups as soon as a player is registered
+ self.mass.loop.create_task(self._register_syncgroups())
+
+ self.logger.info(
+ "Player registered: %s/%s",
+ player_id,
+ player.name,
+ )
+ self.mass.signal_event(EventType.PLAYER_ADDED, object_id=player.player_id, data=player)
+ # always call update to fix special attributes like display name, group volume etc.
+ self.update(player.player_id)
+
+ def register_or_update(self, player: Player) -> None:
+ """Register a new player on the controller or update existing one."""
+ if self.mass.closing:
+ return
+
+ if player.player_id in self._players:
+ self._players[player.player_id] = player
+ self.update(player.player_id)
+ return
+
+ self.register(player)
+
+ def remove(self, player_id: str, cleanup_config: bool = True) -> None:
+ """Remove a player from the registry."""
+ player = self._players.pop(player_id, None)
+ if player is None:
+ return
+ self.logger.info("Player removed: %s", player.name)
+ self.mass.player_queues.on_player_remove(player_id)
+ if cleanup_config:
+ self.mass.config.remove(f"players/{player_id}")
+ self._prev_states.pop(player_id, None)
+ self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
+
+ def update(
+ self, player_id: str, skip_forward: bool = False, force_update: bool = False
+ ) -> None:
+ """Update player state."""
+ if self.mass.closing:
+ return
+ if player_id not in self._players:
+ return
+ player = self._players[player_id]
+ # calculate active group and active source
+ player.active_group = self._get_active_player_group(player)
+ player.active_source = self._get_active_source(player)
+ player.volume_level = player.volume_level or 0 # guard for None volume
+ # calculate group volume
+ player.group_volume = self._get_group_volume_level(player)
+ if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
+ player.volume_level = player.group_volume
+ # prefer any overridden name from config
+ player.display_name = (
+ self.mass.config.get_raw_player_config_value(player.player_id, "name")
+ or player.name
+ or player.player_id
+ )
+ if (
+ not player.powered
+ and player.state == PlayerState.PLAYING
+ and PlayerFeature.POWER not in player.supported_features
+ and player.active_source == player_id
+ ):
+ # mark player as powered if its playing
+ # could happen for players that do not officially support power commands
+ player.powered = True
+ player.hidden = self.mass.config.get_raw_player_config_value(
+ player.player_id, CONF_HIDE_PLAYER, False
+ )
+ player.icon = self.mass.config.get_raw_player_config_value(
+ player.player_id,
+ CONF_ENTRY_PLAYER_ICON.key,
+ CONF_ENTRY_PLAYER_ICON_GROUP.default_value
+ if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP)
+ else CONF_ENTRY_PLAYER_ICON.default_value,
+ )
+ # handle syncgroup - get attributes from first player that has this group as source
+ if player.player_id.startswith(SYNCGROUP_PREFIX):
+ if player.powered and (sync_leader := self.get_sync_leader(player)):
+ player.state = sync_leader.state
+ player.current_item_id = sync_leader.current_item_id
+ player.elapsed_time = sync_leader.elapsed_time
+ player.elapsed_time_last_updated = sync_leader.elapsed_time_last_updated
+ else:
+ player.state = PlayerState.IDLE
+
+ # basic throttle: do not send state changed events if player did not actually change
+ prev_state = self._prev_states.get(player_id, {})
+ new_state = self._players[player_id].to_dict()
+ changed_values = get_changed_values(
+ prev_state,
+ new_state,
+ ignore_keys=["elapsed_time", "elapsed_time_last_updated", "seq_no"],
+ )
+ self._prev_states[player_id] = new_state
+
+ if not player.enabled and not force_update:
+ # ignore updates for disabled players
+ return
+
+ # always signal update to the playerqueue
+ self.mass.player_queues.on_player_update(player, changed_values)
+
+ if len(changed_values) == 0 and not force_update:
+ return
+
+ self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
+
+ if skip_forward:
+ return
+ # update/signal group player(s) child's when group updates
+ if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
+ for child_player in self.iter_group_members(player):
+ if child_player.player_id == player.player_id:
+ continue
+ self.update(child_player.player_id, skip_forward=True)
+ # update/signal group player(s) when child updates
+ for group_player in self._get_player_groups(player, powered_only=False):
+ player_prov = self.get_player_provider(group_player.player_id)
+ if not player_prov:
+ continue
+ if group_player.player_id.startswith(SYNCGROUP_PREFIX):
+ self.update(group_player.player_id, skip_forward=True)
+ else:
+ self.mass.create_task(player_prov.poll_player(group_player.player_id))
+
+ def get_player_provider(self, player_id: str) -> PlayerProvider:
+ """Return PlayerProvider for given player."""
+ player = self._players[player_id]
+ player_provider = self.mass.get_provider(player.provider)
+ return cast(PlayerProvider, player_provider)
+
def get_announcement_volume(self, player_id: str, volume_override: int | None) -> int | None:
"""Get the (player specific) volume for a announcement."""
volume_strategy = self.mass.config.get_raw_player_config_value(
from __future__ import annotations
import asyncio
-import inspect
import logging
import os
import urllib.parse
from concurrent import futures
from contextlib import suppress
from functools import partial
-from typing import TYPE_CHECKING, Any, Final
+from typing import TYPE_CHECKING, Final
from aiohttp import WSMsgType, web
from music_assistant_frontend import where as locate_frontend
from music_assistant.common.helpers.util import get_ip
from music_assistant.common.models.api import (
- ChunkedResultMessage,
CommandMessage,
ErrorResultMessage,
MessageType,
try:
args = parse_arguments(handler.signature, handler.type_hints, msg.args)
result = handler.target(**args)
- if inspect.isasyncgen(result):
- # async generator = send chunked response
- chunk_size = 100
- batch: list[Any] = []
- async for item in result:
- batch.append(item)
- if len(batch) == chunk_size:
- self._send_message(ChunkedResultMessage(msg.message_id, batch))
- batch = []
- # send last chunk
- self._send_message(ChunkedResultMessage(msg.message_id, batch, True))
- del batch
- return
if asyncio.iscoroutine(result):
result = await result
self._send_message(SuccessResultMessage(msg.message_id, result))
Artist,
BrowseFolder,
MediaItemType,
+ PagedItems,
Playlist,
Radio,
SearchResults,
if ProviderFeature.LIBRARY_ALBUMS in self.supported_features:
raise NotImplementedError
- async def get_playlist_tracks( # type: ignore[return]
- self, prov_playlist_id: str
- ) -> AsyncGenerator[Track, None]:
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
"""Get all playlist tracks for given playlist id."""
if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
raise NotImplementedError
- yield # type: ignore
async def library_add(self, item: MediaItemType) -> bool:
"""Add item to provider's library. Return true on success."""
return await self.get_radio(prov_item_id)
return await self.get_track(prov_item_id)
- async def browse(self, path: str) -> AsyncGenerator[MediaItemType, None]:
+ async def browse(self, path: str, offset: int, limit: int) -> PagedItems[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provider_id://artists).
if ProviderFeature.BROWSE not in self.supported_features:
# we may NOT use the default implementation if the provider does not support browse
raise NotImplementedError
-
+ items: list[MediaItemType] = []
+ index = 0
subpath = path.split("://", 1)[1]
# this reference implementation can be overridden with a provider specific approach
+ generator: AsyncGenerator[MediaItemType, None] | None = None
if subpath == "artists":
- async for artist in self.get_library_artists():
- yield artist
- return
- if subpath == "albums":
- async for album in self.get_library_albums():
- yield album
- return
- if subpath == "tracks":
- async for track in self.get_library_tracks():
- yield track
- return
- if subpath == "radios":
- async for radio in self.get_library_radios():
- yield radio
- return
- if subpath == "playlists":
- async for playlist in self.get_library_playlists():
- yield playlist
- return
- if subpath:
+ generator = self.get_library_artists()
+ elif subpath == "albums":
+ generator = self.get_library_albums()
+ elif subpath == "tracks":
+ generator = self.get_library_tracks()
+ elif subpath == "radios":
+ generator = self.get_library_radios()
+ elif subpath == "playlists":
+ generator = self.get_library_playlists()
+ elif subpath:
# unknown path
msg = "Invalid subpath"
raise KeyError(msg)
- # no subpath: return main listing
- if ProviderFeature.LIBRARY_ARTISTS in self.supported_features:
- yield BrowseFolder(
- item_id="artists",
- provider=self.domain,
- path=path + "artists",
- name="",
- label="artists",
- )
- if ProviderFeature.LIBRARY_ALBUMS in self.supported_features:
- yield BrowseFolder(
- item_id="albums",
- provider=self.domain,
- path=path + "albums",
- name="",
- label="albums",
- )
- if ProviderFeature.LIBRARY_TRACKS in self.supported_features:
- yield BrowseFolder(
- item_id="tracks",
- provider=self.domain,
- path=path + "tracks",
- name="",
- label="tracks",
- )
- if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
- yield BrowseFolder(
- item_id="playlists",
- provider=self.domain,
- path=path + "playlists",
- name="",
- label="playlists",
- )
- if ProviderFeature.LIBRARY_RADIOS in self.supported_features:
- yield BrowseFolder(
- item_id="radios",
- provider=self.domain,
- path=path + "radios",
- name="",
- label="radios",
- )
+
+ if generator:
+ # grab items from library generator
+ async for item in generator:
+ if index < offset:
+ continue
+ items.append(item)
+ index += 1
+ if len(items) >= limit:
+ break
+ else:
+ # no subpath: return main listing
+ if ProviderFeature.LIBRARY_ARTISTS in self.supported_features:
+ items.append(
+ BrowseFolder(
+ item_id="artists",
+ provider=self.domain,
+ path=path + "artists",
+ name="",
+ label="artists",
+ )
+ )
+ if ProviderFeature.LIBRARY_ALBUMS in self.supported_features:
+ items.append(
+ BrowseFolder(
+ item_id="albums",
+ provider=self.domain,
+ path=path + "albums",
+ name="",
+ label="albums",
+ )
+ )
+ if ProviderFeature.LIBRARY_TRACKS in self.supported_features:
+ items.append(
+ BrowseFolder(
+ item_id="tracks",
+ provider=self.domain,
+ path=path + "tracks",
+ name="",
+ label="tracks",
+ )
+ )
+ if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
+ items.append(
+ BrowseFolder(
+ item_id="playlists",
+ provider=self.domain,
+ path=path + "playlists",
+ name="",
+ label="playlists",
+ )
+ )
+ if ProviderFeature.LIBRARY_RADIOS in self.supported_features:
+ items.append(
+ BrowseFolder(
+ item_id="radios",
+ provider=self.domain,
+ path=path + "radios",
+ name="",
+ label="radios",
+ )
+ )
+ return PagedItems(items=items, limit=limit, offset=offset)
async def recommendations(self) -> list[MediaItemType]:
"""Get this provider's recommendations.
from __future__ import annotations
+import asyncio
from abc import abstractmethod
from collections.abc import Iterable
# will only be called for players with SYNC feature set.
raise NotImplementedError
+ async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
+ """Create temporary sync group by joining given players to target player."""
+ # default implementation, simply call the cmd_sync for all child players
+ async with asyncio.TaskGroup() as tg:
+ for child_player_id in child_player_ids:
+ tg.create_task(self.cmd_sync(child_player_id, target_player))
+
+ async def cmd_unsync_many(self, player_ids: str) -> None:
+ """Handle UNSYNC command for all the given players.
+
+ Remove the given player from any syncgroups it currently is synced to.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ # default implementation, simply call the cmd_sync for all player_ids
+ async with asyncio.TaskGroup() as tg:
+ for player_id in player_ids:
+ tg.create_task(self.cmd_unsync(player_id))
+
async def create_group(self, name: str, members: list[str]) -> Player:
"""Create new PlayerGroup on this provider.
from __future__ import annotations
import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from music_assistant.constants import CONF_LOG_LEVEL, MASS_LOGGER_NAME
from music_assistant.server.helpers.throttle_retry import ThrottlerManager
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.enums import ProviderFeature, ProviderType
- from music_assistant.common.models.provider import ProviderInstance, ProviderManifest
+ from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
return f"{self.manifest.name}.{postfix}"
return self.manifest.name
- def to_dict(self, *args, **kwargs) -> ProviderInstance:
+ def to_dict(self, *args, **kwargs) -> dict[str, Any]:
"""Return Provider(instance) as serializable dict."""
return {
"type": self.type.value,
self.mass.config.set(key, stored_items)
return True
- async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
- # handle built-in playlists
- """Get all playlist tracks for given playlist id."""
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
if prov_playlist_id in BUILTIN_PLAYLISTS:
- async for item in self._get_builtin_playlist_tracks(prov_playlist_id):
- yield item
- return
+ return await self._get_builtin_playlist_tracks(prov_playlist_id)
# user created universal playlist
+ result: list[Track] = []
playlist_items = await self._read_playlist_file_items(prov_playlist_id)
- for count, uri in enumerate(playlist_items):
+ for index, uri in enumerate(playlist_items[offset:limit]):
try:
# get the provider item and not the full track from a regular 'get' call
# as we only need basic track info here
track = await media_controller.get_provider_item(
item_id, provider_instance_id_or_domain
)
- track.position = count
- yield track
+ track.position = offset + index
+ result.append(track)
except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
self.logger.warning("Skipping item in playlist: %s:%s", uri, str(err))
+ return result
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""
self, builtin_playlist_id: str
) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given builtin playlist id."""
+ result: list[Track] = []
if builtin_playlist_id == ALL_LIBRARY_TRACKS:
- res = await self.mass.music.tracks.library_items(limit=2500, order_by="RANDOM()")
- for idx, item in enumerate(res.items, 1):
+ db_result = await self.mass.music.tracks.library_items(limit=2500, order_by="RANDOM()")
+ for idx, item in enumerate(db_result.items):
item.position = idx
- yield item
- return
+ result.append(item)
+ return result
if builtin_playlist_id == ALL_FAVORITE_TRACKS:
res = await self.mass.music.tracks.library_items(
favorite=True, limit=2500, order_by="RANDOM()"
)
for idx, item in enumerate(res.items, 1):
item.position = idx
- yield item
- return
+ result.append(item)
+ return result
if builtin_playlist_id == RANDOM_TRACKS:
res = await self.mass.music.tracks.library_items(limit=100, order_by="RANDOM()")
for idx, item in enumerate(res.items, 1):
item.position = idx
- yield item
- return
+ result.append(item)
+ return result
if builtin_playlist_id == RANDOM_ALBUM:
for random_album in (
await self.mass.music.albums.library_items(limit=1, order_by="RANDOM()")
)
for idx, track in enumerate(tracks, 1):
track.position = idx
- yield track
- return
+ result.append(track)
+ return result
if builtin_playlist_id == RANDOM_ARTIST:
for random_artist in (
await self.mass.music.artists.library_items(limit=1, order_by="RANDOM()")
)
for idx, track in enumerate(tracks, 1):
track.position = idx
- yield track
- return
+ result.append(track)
+ return result
if builtin_playlist_id == RECENTLY_PLAYED:
tracks = await self.mass.music.recently_played(250, [MediaType.TRACK])
for idx, track in enumerate(tracks, 1):
track.position = idx
- yield track
- return
+ result.append(track)
+ return result
+ return result
async def _read_playlist_file_items(self, playlist_id: str) -> list[str]:
"""Return lines of a playlist file."""
for count, deezer_track in enumerate(await album.get_tracks(), 1)
]
- async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
- """Get all tracks in a playlist."""
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
+ # TODO: implement pagination!
playlist = await self.client.get_playlist(int(prov_playlist_id))
- for count, deezer_track in enumerate(await playlist.get_tracks(), 1):
- yield self.parse_track(
- track=deezer_track, user_country=self.gw_client.user_country, position=count
+ for index, deezer_track in enumerate(await playlist.get_tracks()):
+ result.append(
+ self.parse_track(
+ track=deezer_track,
+ user_country=self.gw_client.user_country,
+ position=offset + index,
+ )
)
+ return result
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get albums by an artist."""
) -> None:
"""Remove track(s) from playlist."""
playlist_track_ids = []
- async for track in self.get_playlist_tracks(prov_playlist_id):
+ for track in await self.get_playlist_tracks(prov_playlist_id, 0, 10000):
if track.position in positions_to_remove:
playlist_track_ids.append(int(track.item_id))
if len(playlist_track_ids) == len(positions_to_remove):
MediaItemImage,
MediaItemType,
MediaType,
+ PagedItems,
Playlist,
ProviderMapping,
SearchResults,
)
return result
- async def browse(self, path: str) -> AsyncGenerator[MediaItemType, None]:
+ async def browse(self, path: str, offset: int, limit: int) -> PagedItems[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
"""
+ items: list[MediaItemType] = []
item_path = path.split("://", 1)[1]
if not item_path:
item_path = ""
+ index = 0
async for item in self.listdir(item_path, recursive=False):
- if item.is_dir:
- yield BrowseFolder(
- item_id=item.path,
- provider=self.instance_id,
- path=f"{self.instance_id}://{item.path}",
- name=item.filename,
- )
+ if not item.is_dir and "." not in item.filename or not item.ext:
+ # skip system files and files without extension
continue
- if "." not in item.filename or not item.ext:
- # skip system files and files without extension
+ if index < offset:
continue
- if item.ext in TRACK_EXTENSIONS:
- yield ItemMapping(
- media_type=MediaType.TRACK,
- item_id=item.path,
- provider=self.instance_id,
- name=item.filename,
+ if item.is_dir:
+ items.append(
+ BrowseFolder(
+ item_id=item.path,
+ provider=self.instance_id,
+ path=f"{self.instance_id}://{item.path}",
+ name=item.filename,
+ )
)
- continue
- if item.ext in PLAYLIST_EXTENSIONS:
- yield ItemMapping(
- media_type=MediaType.PLAYLIST,
- item_id=item.path,
- provider=self.instance_id,
- name=item.filename,
+ elif item.ext in TRACK_EXTENSIONS:
+ items.append(
+ ItemMapping(
+ media_type=MediaType.TRACK,
+ item_id=item.path,
+ provider=self.instance_id,
+ name=item.filename,
+ )
)
+ elif item.ext in PLAYLIST_EXTENSIONS:
+ items.append(
+ ItemMapping(
+ media_type=MediaType.PLAYLIST,
+ item_id=item.path,
+ provider=self.instance_id,
+ name=item.filename,
+ )
+ )
+ index += 1
+ if len(items) >= limit:
+ break
+ return PagedItems(items=items, limit=limit, offset=offset)
async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
"""Run library sync for this provider."""
if any(x.provider_instance == self.instance_id for x in track.provider_mappings)
]
- async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
- """Get playlist tracks for given playlist id."""
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
if not await self.exists(prov_playlist_id):
msg = f"Playlist path does not exist: {prov_playlist_id}"
raise MediaNotFoundError(msg)
else:
playlist_lines = parse_pls(playlist_data)
- for line_no, playlist_line in enumerate(playlist_lines, 0):
+ playlist_lines = playlist_lines[offset:limit]
+
+ for line_no, playlist_line in enumerate(playlist_lines):
if track := await self._parse_playlist_line(
playlist_line.path, os.path.dirname(prov_playlist_id)
):
- track.position = line_no
- yield track
+ track.position = offset + line_no
+ result.append(track)
except Exception as err: # pylint: disable=broad-except
self.logger.warning(
str(err),
exc_info=err if self.logger.isEnabledFor(10) else None,
)
+ return result
async def _parse_playlist_line(self, line: str, playlist_path: str) -> Track | None:
"""Try to parse a track from a playlist line."""
msg = f"Item {prov_playlist_id} not found"\r
raise MediaNotFoundError(msg)\r
\r
- async def get_playlist_tracks( # type: ignore[return]\r
- self, prov_playlist_id: str\r
- ) -> AsyncGenerator[Track, None]:\r
- """Get all playlist tracks for given playlist id."""\r
+ async def get_playlist_tracks(\r
+ self, prov_playlist_id: str, offset: int, limit: int\r
+ ) -> list[Track]:\r
+ """Get playlist tracks."""\r
+ result: list[Track] = []\r
jellyfin_playlist = API.get_item(self._jellyfin_server.jellyfin, prov_playlist_id)\r
-\r
playlist_items = await self._get_children(\r
self._jellyfin_server, jellyfin_playlist[ITEM_KEY_ID], ITEM_TYPE_AUDIO\r
)\r
- for index, jellyfin_track in enumerate(playlist_items or [], 1):\r
+ if not playlist_items:\r
+ return result\r
+ for index, jellyfin_track in enumerate(playlist_items):\r
if track := await self._parse_track(jellyfin_track):\r
if not track.position:\r
track.position = index\r
- yield track\r
+ result.append(track)\r
+ return result\r
\r
async def get_artist_albums(self, prov_artist_id) -> list[Album]:\r
"""Get a list of albums for the given artist."""\r
raise MediaNotFoundError(msg) from e
return self._parse_playlist(sonic_playlist)
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
- """Provide a generator for the tracks on a specified Playlist."""
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
try:
sonic_playlist: SonicPlaylist = await self._run_async(
self._conn.getPlaylist, prov_playlist_id
for index, sonic_song in enumerate(sonic_playlist.songs):
track = self._parse_track(sonic_song)
track.position = index
- yield track
+ result.append(track)
+ return result
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get the top listed tracks for a specified artist."""
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
- async def get_playlist_tracks( # type: ignore[return]
- self, prov_playlist_id: str
- ) -> AsyncGenerator[Track, None]:
- """Get all playlist tracks for given playlist id."""
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
+ # TODO: implement paging ?!
plex_playlist: PlexPlaylist = await self._get_data(prov_playlist_id, PlexPlaylist)
- playlist_items = await self._run_async(plex_playlist.items)
-
- for index, plex_track in enumerate(playlist_items or []):
+ if not (playlist_items := await self._run_async(plex_playlist.items)):
+ return result
+ for index, plex_track in enumerate(playlist_items):
if track := await self._parse_track(plex_track):
track.position = index
- yield track
+ result.append(track)
+ return result
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
if (item and item["id"])
]
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
- """Get all playlist tracks for given playlist id."""
- count = 1
- for track_obj in await self._get_all_items(
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
+ qobuz_result = await self._get_data(
"playlist/get",
key="tracks",
playlist_id=prov_playlist_id,
extra="tracks",
- ):
+ offset=offset,
+ limit=limit,
+ )
+ for index, track_obj in enumerate(qobuz_result["tracks"]["items"]):
if not (track_obj and track_obj["id"]):
continue
track = await self._parse_track(track_obj)
- track.position = count
- yield track
- count += 1
+ track.position = index + offset
+ result.append(track)
+ return result
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
) -> None:
"""Remove track(s) from playlist."""
playlist_track_ids = set()
- async for track in self.get_playlist_tracks(prov_playlist_id):
- if track.position in positions_to_remove:
+ for pos in positions_to_remove:
+ for track in await self.get_playlist_tracks(prov_playlist_id, pos, pos):
playlist_track_ids.add(str(track["playlist_track_id"]))
- if len(playlist_track_ids) == positions_to_remove:
- break
return await self._get_data(
"playlist/deleteTracks",
playlist_id=prov_playlist_id,
MediaItemLink,
MediaItemType,
MediaType,
+ PagedItems,
ProviderMapping,
Radio,
SearchResults,
SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator
-
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueType,
return result
- async def browse(self, path: str) -> AsyncGenerator[MediaItemType, None]:
+ async def browse(self, path: str, limit: int, offset: int) -> PagedItems[MediaItemType]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
"""
+ items: list[BrowseFolder | Radio] = []
subpath = path.split("://", 1)[1]
subsubpath = "" if "/" not in subpath else subpath.split("/")[-1]
if not subpath:
# return main listing
- yield BrowseFolder(
- item_id="popular",
- provider=self.domain,
- path=path + "popular",
- name="",
- label="radiobrowser_by_popularity",
- )
- yield BrowseFolder(
- item_id="country",
- provider=self.domain,
- path=path + "country",
- name="",
- label="radiobrowser_by_country",
- )
- yield BrowseFolder(
- item_id="tag",
- provider=self.domain,
- path=path + "tag",
- name="",
- label="radiobrowser_by_tag",
- )
- return
+ items = [
+ BrowseFolder(
+ item_id="popular",
+ provider=self.domain,
+ path=path + "popular",
+ name="",
+ label="radiobrowser_by_popularity",
+ ),
+ BrowseFolder(
+ item_id="country",
+ provider=self.domain,
+ path=path + "country",
+ name="",
+ label="radiobrowser_by_country",
+ ),
+ BrowseFolder(
+ item_id="tag",
+ provider=self.domain,
+ path=path + "tag",
+ name="",
+ label="radiobrowser_by_tag",
+ ),
+ ]
if subpath == "popular":
- for item in await self.get_by_popularity():
- yield item
- return
+ items = await self.get_by_popularity()
if subpath == "tag":
tags = await self.radios.tags(
reverse=True,
)
tags.sort(key=lambda tag: tag.name)
- for tag in tags:
- yield BrowseFolder(
+ items = [
+ BrowseFolder(
item_id=tag.name.lower(),
provider=self.domain,
path=path + "/" + tag.name.lower(),
name=tag.name,
)
- return
+ for tag in tags
+ ]
if subpath == "country":
for country in await self.radios.countries(order=Order.NAME):
remotely_accessible=True,
)
]
- yield folder
- return
+ items.append(folder)
if subsubpath in await self.get_tag_names():
- for item in await self.get_by_tag(subsubpath):
- yield item
- return
+ items = await self.get_by_tag(subsubpath)
if subsubpath in await self.get_country_codes():
- for item in await self.get_by_country(subsubpath):
- yield item
+ items = await self.get_by_country(subsubpath)
+ return PagedItems(items=items, limit=limit, offset=offset)
async def get_tag_names(self):
"""Get a list of tag names."""
self.logger.debug("Parse playlist failed: %s", playlist_obj, exc_info=error)
return playlist
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
- """Get all playlist tracks for given playlist id."""
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
+ # TODO: soundcloud doesn't seem to support paging for playlist tracks ?!
playlist_obj = await self._soundcloud.get_playlist_details(playlist_id=prov_playlist_id)
if "tracks" not in playlist_obj:
- return
+ return result
for index, item in enumerate(playlist_obj["tracks"]):
song = await self._soundcloud.get_track_details(item["id"])
try:
- if track := await self._parse_track(song[0], index + 1):
- yield track
+ # TODO: is it really needed to grab the entire track with an api call ?
+ if track := await self._parse_track(song[0], index + offset):
+ result.append(track)
except (KeyError, TypeError, InvalidDataError, IndexError) as error:
self.logger.debug("Parse track failed: %s", song, exc_info=error)
continue
+ return result
async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
"""Get a list of 25 most popular tracks for the given artist."""
if item["id"]
]
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
- """Get all playlist tracks for given playlist id."""
- count = 1
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
uri = (
"me/tracks"
if prov_playlist_id == self._get_liked_songs_playlist_id()
else f"playlists/{prov_playlist_id}/tracks"
)
- for item in await self._get_all_items(
- uri,
- ):
+ spotify_result = await self._get_data(uri, limit=limit, offset=offset)
+ for index, item in enumerate(spotify_result["items"]):
if not (item and item["track"] and item["track"]["id"]):
continue
# use count as position
track = self._parse_track(item["track"])
- track.position = count
- yield track
- count += 1
+ track.position = offset + index
+ result.append(track)
+ return result
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of all albums for the given artist."""
) -> None:
"""Remove track(s) from playlist."""
track_uris = []
- async for track in self.get_playlist_tracks(prov_playlist_id):
- if track.position in positions_to_remove:
+ for pos in positions_to_remove:
+ for track in await self.get_playlist_tracks(prov_playlist_id, pos, pos):
track_uris.append({"uri": f"spotify:track:{track.item_id}"})
- if len(track_uris) == positions_to_remove:
- break
data = {"tracks": track_uris}
return await self._delete_data(f"playlists/{prov_playlist_id}/tracks", data=data)
artist_toptracks_obj = await get_artist_toptracks(tidal_session, prov_artist_id)
return [self._parse_track(track) for track in artist_toptracks_obj]
- async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
- """Get all playlist tracks for given playlist id."""
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Get playlist tracks."""
tidal_session = await self._get_tidal_session()
- total_playlist_tracks = 0
+ result: list[Track] = []
track_obj: TidalTrack # satisfy the type checker
- async for track_obj in self._iter_items(
- get_playlist_tracks, tidal_session, prov_playlist_id, limit=DEFAULT_LIMIT
+ for index, track_obj in enumerate(
+ await get_playlist_tracks(tidal_session, prov_playlist_id, limit=limit, offset=offset)
):
- total_playlist_tracks += 1
track = self._parse_track(track_obj=track_obj)
- track.position = total_playlist_tracks
- yield track
+ track.position = offset + index
+ result.append(track)
+ return result
@throttle_with_retries
async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
"""Remove track(s) from playlist."""
prov_track_ids = []
tidal_session = await self._get_tidal_session()
- async for track in self.get_playlist_tracks(prov_playlist_id):
+ for track in await self.get_playlist_tracks(prov_playlist_id, 0, 10000):
if track.position in positions_to_remove:
prov_track_ids.append(track.item_id)
if len(prov_track_ids) == len(positions_to_remove):
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
- async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
- """Get all playlist tracks for given playlist id."""
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str, offset: int, limit: int
+ ) -> list[Track]:
+ """Return playlist tracks for the given provider playlist id."""
await self._check_oauth_token()
# Grab the playlist id from the full url in case of personal playlists
if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
)
except KeyError as ke:
self.logger.warning("Could not load playlist: %s: %s", prov_playlist_id, ke)
- return
+ return None
if "tracks" not in playlist_obj:
- return
+ return None
+ result = []
for index, track_obj in enumerate(playlist_obj["tracks"]):
if track_obj["isAvailable"]:
# Playlist tracks sometimes do not have a valid artist id
try:
if track := await self._parse_track(track_obj):
track.position = index + 1
- yield track
+ result.append(track)
except InvalidDataError:
if track := await self.get_track(track_obj["videoId"]):
track.position = index + 1
- yield track
+ result.append(track)
+ # YTM doesn't seem to support paging so we ignore offset and limit
+ return result
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
if artist_obj.get("songs") and artist_obj["songs"].get("browseId"):
prov_playlist_id = artist_obj["songs"]["browseId"]
- playlist_tracks = [
- x async for x in self.get_playlist_tracks(prov_playlist_id=prov_playlist_id)
- ]
+ playlist_tracks = await self.get_playlist_tracks(prov_playlist_id, 0, 0)
return playlist_tracks[:25]
return []
-"""Example script to test the MusicAssistant server and client."""
+"""Example script to test the MusicAssistant client."""
import argparse
-import asyncio
import logging
-import os
-from pathlib import Path
from aiorun import run
from music_assistant.client.client import MusicAssistantClient
-from music_assistant.server.server import MusicAssistant
# ruff: noqa: ANN201,PTH102,PTH112,PTH113,PTH118,PTH123,T201
-DEFAULT_PORT = 8095
-DEFAULT_URL = f"http://127.0.0.1:{DEFAULT_PORT}"
-DEFAULT_STORAGE_PATH = os.path.join(Path.home(), ".musicassistant")
logging.basicConfig(level=logging.DEBUG)
# Get parsed passed in arguments.
-parser = argparse.ArgumentParser(description="MusicAssistant Server Example.")
+parser = argparse.ArgumentParser(description="MusicAssistant Client Example.")
parser.add_argument(
- "--config",
+ "url",
type=str,
- default=DEFAULT_STORAGE_PATH,
- help="Storage path to keep persistent (configuration) data, "
- "defaults to {DEFAULT_STORAGE_PATH}",
+ help="URL of MASS server, e.g. http://localhost:8095",
)
parser.add_argument(
"--log-level",
# configure logging
logging.basicConfig(level=args.log_level.upper())
- # make sure storage path exists
- if not os.path.isdir(args.config):
- os.mkdir(args.config)
-
- # Init server
- server = MusicAssistant(args.config)
-
async def run_mass():
- """Run the MusicAssistant server and client."""
- # start MusicAssistant Server
- await server.start()
-
+ """Run the MusicAssistant client."""
# run the client
- async with MusicAssistantClient(DEFAULT_URL, None) as client:
+ async with MusicAssistantClient(args.url, None) as client:
# start listening
await client.start_listening()
- async def handle_stop(loop: asyncio.AbstractEventLoop): # noqa: ARG001
- """Handle server stop."""
- await server.stop()
-
# run the server
- run(run_mass(), shutdown_callback=handle_stop)
+ run(run_mass())