from __future__ import annotations
from dataclasses import dataclass
-from typing import Any
+from typing import TYPE_CHECKING, Any
from uuid import uuid4
from mashumaro import DataClassDictMixin
from .enums import MediaType
-from .media_items import ItemMapping, MediaItemImage, Radio, StreamDetails, Track
+from .media_items import ItemMapping, Radio, StreamDetails, Track
+
+if TYPE_CHECKING:
+ from music_assistant.server import MusicAssistant
@dataclass
sort_index: int = 0
streamdetails: StreamDetails | None = None
media_item: Track | Radio | None = None
- image: MediaItemImage | None = None
+ image_url: str | None = None
def __post_init__(self):
"""Set default values."""
name=name,
duration=media_item.duration,
media_item=media_item,
- image=media_item.image,
+ )
+
+ async def resolve_image_url(self, mass: MusicAssistant) -> None:
+ """Resolve Image URL for the MediaItem."""
+ if self.image_url:
+ return
+ if not self.media_item:
+ return
+ self.image_url = await mass.metadata.get_image_url_for_item(
+ self.media_item, resolve_local=True
)
img_path = await self.get_image_url_for_item(
media_item=media_item,
img_type=img_type,
- allow_local=True,
- local_as_base64=False,
)
if not img_path:
return None
self,
media_item: MediaItemType,
img_type: ImageType = ImageType.THUMB,
+ resolve_local: bool = True,
allow_local: bool = True,
- local_as_base64: bool = False,
) -> str | None:
"""Get url to image for given media media_item."""
if not media_item:
continue
if img.is_file and not allow_local:
continue
- if img.is_file and local_as_base64:
- # return base64 string of the image (compatible with browsers)
- return await self.get_thumbnail(img.url, base64=True)
+ if img.is_file and resolve_local:
+ # return imageproxy url for local filesystem items
+ # the original path is double encoded
+ encoded_url = urllib.parse.quote(urllib.parse.quote(img.url))
+ return f"{self.mass.base_url}/imageproxy?path={encoded_url}"
return img.url
# retry with track's album
if media_item.media_type == MediaType.TRACK and media_item.album:
- return await self.get_image_url_for_item(
- media_item.album, img_type, allow_local, local_as_base64
- )
+ return await self.get_image_url_for_item(media_item.album, img_type, resolve_local)
# try artist instead for albums
if media_item.media_type == MediaType.ALBUM and media_item.artist:
- return await self.get_image_url_for_item(
- media_item.artist, img_type, allow_local, local_as_base64
- )
+ return await self.get_image_url_for_item(media_item.artist, img_type, resolve_local)
# last resort: track artist(s)
if media_item.media_type == MediaType.TRACK and media_item.artists:
for artist in media_item.artists:
- return await self.get_image_url_for_item(
- artist, img_type, allow_local, local_as_base64
- )
+ return await self.get_image_url_for_item(artist, img_type, resolve_local)
return None
QueueOption,
RepeatMode,
)
-from music_assistant.common.models.errors import (
- MediaNotFoundError,
- MusicAssistantError,
- QueueEmpty,
-)
+from music_assistant.common.models.errors import MediaNotFoundError, MusicAssistantError, QueueEmpty
from music_assistant.common.models.media_items import MediaItemType, media_from_dict
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import (
- CONF_FLOW_MODE,
- FALLBACK_DURATION,
- ROOT_LOGGER_NAME,
-)
+from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, ROOT_LOGGER_NAME
from music_assistant.server.helpers.api import api_command
if TYPE_CHECKING:
player_prov = self.mass.players.get_player_provider(queue_id)
flow_mode = self.mass.config.get_player_config_value(queue.queue_id, CONF_FLOW_MODE)
queue.flow_mode = flow_mode.value
+ # make sure that the queue item image is resolved
+ await queue_item.resolve_image_url(self.mass)
await player_prov.cmd_play_media(
queue_id,
queue_item=queue_item,
self._queues.pop(player_id, None)
self._queue_items.pop(player_id, None)
- def player_ready_for_next_track(
+ async def player_ready_for_next_track(
self, queue_or_player_id: str, current_item_id: str | None = None
) -> tuple[QueueItem, bool]:
"""Call when a player is ready to load the next track into the buffer.
next_item = self.get_item(queue.queue_id, next_index)
if not next_item:
raise QueueEmpty("No more tracks left in the queue.")
+ # make sure that the queue item image is resolved
+ await next_item.resolve_image_url(self.mass)
queue.index_in_buffer = next_index
# work out crossfade
crossfade = queue.crossfade_enabled
(
queue_track,
use_crossfade,
- ) = self.mass.players.queues.player_ready_for_next_track(
+ ) = await self.mass.players.queues.player_ready_for_next_track(
queue_id, queue_track.queue_item_id
)
except QueueEmpty:
"</item>"
"</DIDL-Lite>"
)
+
if is_radio:
# radio or other non-track item
- image = queue_item.image.url if queue_item.image else ""
+ image = queue_item.image_url if queue_item.image else ""
return (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
f'<item id="{queue_item.queue_item_id}" parentID="0" restricted="1">'
f"<dc:title>{_escape_str(queue_item.name)}</dc:title>"
- f"<upnp:albumArtURI>{queue_item.image.url}</upnp:albumArtURI>"
+ f"<upnp:albumArtURI>{queue_item.image_url}</upnp:albumArtURI>"
f"<dc:queueItemId>{image}</dc:queueItemId>"
"<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
album = _escape_str(queue_item.media_item.album.name)
else:
album = ""
- image = queue_item.image.url if queue_item.image else ""
item_class = "object.item.audioItem.musicTrack"
duration_str = str(datetime.timedelta(seconds=queue_item.duration))
return (
f"<upnp:duration>{queue_item.duration}</upnp:duration>"
"<upnp:playlistTitle>Music Assistant</upnp:playlistTitle>"
f"<dc:queueItemId>{queue_item.queue_item_id}</dc:queueItemId>"
- f"<upnp:albumArtURI>{image}</upnp:albumArtURI>"
+ f"<upnp:albumArtURI>{queue_item.image_url}</upnp:albumArtURI>"
f"<upnp:class>{item_class}</upnp:class>"
f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
f'<res duration="{duration_str}" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{url}</res>'
if not current_queue_item_id:
return # guard
try:
- next_item, crossfade = self.mass.players.queues.player_ready_for_next_track(
+ next_item, crossfade = await self.mass.players.queues.player_ready_for_next_track(
castplayer.player_id, current_queue_item_id
)
except QueueEmpty:
"songName": queue_item.media_item.name,
"artist": queue_item.media_item.artist.name if queue_item.media_item.artist else "",
"title": queue_item.name,
- "images": [{"url": queue_item.image.url}] if queue_item.image else None,
+ "images": [{"url": queue_item.image_url}] if queue_item.image_url else None,
}
else:
stream_type = STREAM_TYPE_LIVE
metadata = {
"metadataType": 0,
"title": queue_item.name,
- "images": [{"url": queue_item.image.url}] if queue_item.image else None,
+ "images": [{"url": queue_item.image_url}] if queue_item.image_url else None,
}
return {
"autoplay": True,
if not self.mass.players.queues.get_item(dlna_player.udn, current_queue_item_id):
return # guard
try:
- next_item, crossfade = self.mass.players.queues.player_ready_for_next_track(
+ next_item, crossfade = await self.mass.players.queues.player_ready_for_next_track(
dlna_player.udn, current_queue_item_id
)
except QueueEmpty:
"genre": "",
"remote": 0,
"remote_title": queue_item.streamdetails.stream_title if queue_item.streamdetails else "",
- "artwork_url": queue_item.image.url if queue_item.image else "",
+ "artwork_url": queue_item.image_url or "",
"bitrate": "",
"duration": queue_item.duration or 0,
"coverid": "-94099753136392",
if not client.current_metadata:
return
try:
- next_item, crossfade = self.mass.players.queues.player_ready_for_next_track(
+ next_item, crossfade = await self.mass.players.queues.player_ready_for_next_track(
client.player_id, client.current_metadata["item_id"]
)
await self._handle_play_media(client, next_item, send_flush=False, crossfade=crossfade)
if not self.mass.players.queues.get_item(sonos_player.player_id, current_queue_item_id):
return # guard
try:
- next_item, crossfade = self.mass.players.queues.player_ready_for_next_track(
+ next_item, crossfade = await self.mass.players.queues.player_ready_for_next_track(
sonos_player.player_id, current_queue_item_id
)
except QueueEmpty: