) -> ItemCls:
"""Return (full) details for a single media item."""
assert provider or provider_id, "provider or provider_id must be supplied"
+ if isinstance(provider, str):
+ provider = ProviderType(provider)
db_item = await self.get_db_item_by_prov_id(
provider_item_id=provider_item_id,
provider=provider,
) -> ItemCls | None:
"""Get the database item for the given prov_id."""
assert provider or provider_id, "provider or provider_id must be supplied"
+ if isinstance(provider, str):
+ provider = ProviderType(provider)
if provider == ProviderType.DATABASE or provider_id == "database":
return await self.get_db_item(provider_item_id, db=db)
for item in await self.get_db_items_by_prov_id(
) -> List[ItemCls]:
"""Fetch all records from database for given provider."""
assert provider or provider_id, "provider or provider_id must be supplied"
+ if isinstance(provider, str):
+ provider = ProviderType(provider)
if provider == ProviderType.DATABASE or provider_id == "database":
return await self.get_db_items(limit=limit, offset=offset, db=db)
)
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.event import MassEvent
+from music_assistant.models.media_items import MediaItemType, media_from_dict
from .player import Player, PlayerState, get_child_players
from .queue_item import QueueItem
async def play_media(
self,
- uris: str | List[str],
+ media: str | List[str] | MediaItemType | List[MediaItemType],
queue_opt: QueueOption = QueueOption.PLAY,
passive: bool = False,
) -> str:
"""
Play media item(s) on the given queue.
- :param uri: uri(s) that should be played (single item or list of uri's).
+ :param media: media(s) that should be played (MediaItem(s) or uri's).
:param queue_opt:
QueueOption.PLAY -> Insert new items in queue and start playing at inserted position
QueueOption.REPLACE -> Replace queue contents with these items
:param passive: if passive set to true the stream url will not be sent to the player.
"""
# a single item or list of items may be provided
- if not isinstance(uris, list):
- uris = [uris]
+ if not isinstance(media, list):
+ media = [media]
queue_items = []
- for uri in uris:
+ for item in media:
# parse provided uri into a MA MediaItem or Basic QueueItem from URL
- try:
- media_item = await self.mass.music.get_item_by_uri(uri)
- except MusicAssistantError as err:
- # invalid MA uri or item not found error
- if uri.startswith("http") or os.path.isfile(uri):
- # a plain url (or local file) was provided
- queue_items.append(QueueItem.from_url(uri))
- continue
- raise MediaNotFoundError(f"Invalid uri: {uri}") from err
+ if isinstance(item, str):
+ try:
+ media_item = await self.mass.music.get_item_by_uri(item)
+ except MusicAssistantError as err:
+ # invalid MA uri or item not found error
+ if item.startswith("http") or os.path.isfile(item):
+ # a plain url (or local file) was provided
+ queue_items.append(QueueItem.from_url(item))
+ continue
+ raise MediaNotFoundError(f"Invalid uri: {item}") from err
+ elif isinstance(item, dict):
+ media_item = media_from_dict(item)
+ else:
+ media_item = item
# collect tracks to play
+ tracks = []
if media_item.media_type == MediaType.ARTIST:
tracks = await self.mass.music.artists.toptracks(
media_item.item_id, provider=media_item.provider
tracks = await self.mass.music.playlists.tracks(
media_item.item_id, provider=media_item.provider
)
- elif media_item.media_type == MediaType.RADIO:
- # single radio
- tracks = [
- await self.mass.music.radio.get(
- media_item.item_id, provider=media_item.provider
- )
- ]
- else:
- # single track
- tracks = [
- await self.mass.music.tracks.get(
- media_item.item_id, provider=media_item.provider
- )
- ]
+ elif media_item.media_type in (
+ MediaType.RADIO,
+ MediaType.TRACK,
+ MediaType.URL,
+ ):
+ # single item
+ tracks = [media_item]
+
+ # only add available items
for track in tracks:
if not track.available:
continue