from music_assistant.helpers.database import TABLE_THUMBS
from music_assistant.helpers.images import create_thumbnail
-from music_assistant.models.media_items import Album, Artist, Playlist, Radio, Track
+from music_assistant.models.enums import ImageType, MediaType
+from music_assistant.models.media_items import (
+ Album,
+ Artist,
+ ItemMapping,
+ MediaItemType,
+ Playlist,
+ Radio,
+ Track,
+)
from .audiodb import TheAudioDb
from .fanarttv import FanartTv
self.logger.warning("Unable to get musicbrainz ID for artist %s !", artist.name)
return None
+ async def get_image_data_for_item(
+ self,
+ media_item: MediaItemType,
+ img_type: ImageType = ImageType.THUMB,
+ size: Optional[int] = None,
+ ) -> bytes | None:
+ """Get image data for given MedaItem."""
+ img_path = await self.get_image_url_for_item(
+ media_item=media_item,
+ img_type=img_type,
+ allow_local=True,
+ local_as_base64=False,
+ )
+ if not img_path:
+ return None
+ return await self.get_thumbnail(img_path, size)
+
+ async def get_image_url_for_item(
+ self,
+ media_item: MediaItemType,
+ img_type: ImageType = ImageType.THUMB,
+ allow_local: bool = True,
+ local_as_base64: bool = True,
+ ) -> str | None:
+ """Get url to image for given media media_item."""
+ if not media_item:
+ return None
+ if isinstance(media_item, ItemMapping):
+ media_item = await self.mass.music.get_item_by_uri(media_item.uri)
+ if media_item and media_item.metadata.images:
+ for img in media_item.metadata.images:
+ if img.type != img_type:
+ continue
+ if img.is_file and not allow_local:
+ continue
+ if img.is_file and local_as_base64:
+ # return base64 string of the image (compatible with browsers)
+ return await self.get_thumbnail(img.url, base64=True)
+ return img.url
+
+ # retry with track's album
+ if media_item.media_type == MediaType.TRACK and media_item.album:
+ return await self.get_image_url_for_item(
+ media_item.album, img_type, allow_local, local_as_base64
+ )
+
+ # try artist instead for albums
+ if media_item.media_type == MediaType.ALBUM and media_item.artist:
+ return await self.get_image_url_for_item(
+ media_item.artist, img_type, allow_local, local_as_base64
+ )
+
+ # last resort: track artist(s)
+ if media_item.media_type == MediaType.TRACK and media_item.artists:
+ for artist in media_item.artists:
+ return await self.get_image_url_for_item(
+ artist, img_type, allow_local, local_as_base64
+ )
+
+ return None
+
async def get_thumbnail(
- self, path: str, size: Optional[int], base64: bool = False
+ self, path: str, size: Optional[int] = None, base64: bool = False
) -> bytes | str:
- """Get/create thumbnail image for path."""
+ """Get/create thumbnail image for path (image url or local path)."""
+ # check if we already have this cached in the db
match = {"path": path, "size": size}
if result := await self.mass.database.get_row(TABLE_THUMBS, match):
thumbnail = result["data"]
LOGGER.debug(
"Start analyzing track %s/%s",
- streamdetails.provider,
+ streamdetails.provider.value,
streamdetails.item_id,
)
# calculate BS.1770 R128 integrated loudness with ffmpeg
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if audio_data else None,
)
- value, _ = await proc.communicate(audio_data or None)
- loudness = float(value.decode().strip())
- await mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness
- )
- LOGGER.debug(
- "Integrated loudness of %s/%s is: %s",
- streamdetails.provider,
- streamdetails.item_id,
- loudness,
- )
+ stdout, stderr = await proc.communicate(audio_data or None)
+ try:
+ loudness = float(stdout.decode().strip())
+ except ValueError: # pylint: disable=broad-except
+ LOGGER.warning(
+ "Could not determine integrated loudness of %s/%s - %s %s",
+ streamdetails.provider.value,
+ streamdetails.item_id,
+ stdout.decode(),
+ stderr.decode(),
+ )
+ else:
+ await mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness
+ )
+ LOGGER.debug(
+ "Integrated loudness of %s/%s is: %s",
+ streamdetails.provider.value,
+ streamdetails.item_id,
+ loudness,
+ )
async def get_stream_details(
from PIL import Image
from tinytag import TinyTag
-from music_assistant.models.enums import ImageType, MediaType
-from music_assistant.models.media_items import ItemMapping, MediaItemType
-
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
else:
# regular image file on disk
async with prov.open_file(path) as _file:
- img_data = BytesIO(await _file.read())
+ img_data = await _file.read()
break
if not img_data:
raise FileNotFoundError(f"Image not found: {path}")
return data.getvalue()
return await mass.loop.run_in_executor(None, _create_image)
-
-
-async def get_image_url(
- mass: MusicAssistant,
- media_item: MediaItemType,
- img_type: ImageType = ImageType.THUMB,
-) -> str:
- """Get url to image for given media media_item."""
- if not media_item:
- return None
- if isinstance(media_item, ItemMapping):
- media_item = await mass.music.get_item_by_uri(media_item.uri)
- if media_item and media_item.metadata.images:
- for img in media_item.metadata.images:
- if img.type == img_type:
- return img.url
-
- # retry with track's album
- if media_item.media_type == MediaType.TRACK and media_item.album:
- return await get_image_url(mass, media_item.album, img_type)
-
- # try artist instead for albums
- if media_item.media_type == MediaType.ALBUM and media_item.artist:
- return await get_image_url(mass, media_item.artist, img_type)
-
- # last resort: track artist(s)
- if media_item.media_type == MediaType.TRACK and media_item.artists:
- for artist in media_item.artists:
- return await get_image_url(mass, artist, img_type)
-
- return None
async def power(self, powered: bool) -> None:
"""Send POWER command to player."""
- if not self.use_multi_stream:
+ if self.use_multi_stream:
+ # redirect command to all child players
+ await asyncio.gather(
+ *[x.power(powered) for x in self._get_child_players(True)]
+ )
+ else:
return await super().power(powered)
- # redirect command to all child players
- await asyncio.gather(*[x.power(powered) for x in self._get_child_players(True)])
async def volume_set(self, volume_level: int) -> None:
"""Send volume level (0..100) command to player."""
def on_child_update(self, player_id: str, changed_keys: set) -> None:
"""Call when one of the child players of a playergroup updates."""
self.update_state(True)
- if "powered" in changed_keys:
- # convenience helper:
- # power off group player if last child player turns off
- powered_childs = set()
- for child_id in self._attr_group_childs:
- if player := self.mass.players.get_player(child_id):
- if player.powered:
- powered_childs.add(child_id)
- if self.powered and len(powered_childs) == 0:
- self.mass.create_task(self.power(False))
+
+ # convenience helper:
+ # power off group player if last child player turns off
+ if "powered" not in changed_keys or not self.active_queue.active:
+ return
+ powered_childs = set()
+ for child_id in self._attr_group_childs:
+ if player := self.mass.players.get_player(child_id):
+ if player.powered:
+ powered_childs.add(child_id)
+ if self.powered and len(powered_childs) == 0:
+
+ async def auto_turn_off_group():
+ await self.active_queue.stop()
+ await self.power(False)
+
+ self.mass.create_task(auto_turn_off_group())
self.queue_id = player_id
self._settings = QueueSettings(self)
self._current_index: Optional[int] = None
+ # index_in_buffer: which track is currently (pre)loaded in the streamer
+ self._index_in_buffer: Optional[int] = None
self._current_item_elapsed_time: int = 0
self._last_item: Optional[QueueItem] = None
- self._start_index: int = 0 # from which index did the queue start playing
- self._next_start_index: int = 0 # which index should the stream start
+ # start_index: from which index did the queuestream start playing
+ self._start_index: int = 0
+ self._next_start_index: int = 0
self._last_state = PlayerState.IDLE
self._items: List[QueueItem] = []
self._save_task: TimerHandle = None
self._update_task: Task = None
self._signal_next: bool = False
self._last_player_update: int = 0
+
self._stream_url: str = ""
async def setup(self) -> None:
"""Toggle play/pause on queue/player."""
if self.player.state == PlayerState.PLAYING:
await self.pause()
- else:
- await self.play()
+ return
+ await self.play()
async def next(self) -> None:
"""Play the next track in the queue."""
async def play_index(self, index: Union[int, str], passive: bool = False) -> None:
"""Play item at index (or item_id) X in queue."""
+ # power on player when requesting play
+ if not self.player.powered:
+ await self.player.power(True)
if self.player.use_multi_stream:
await self.mass.streams.stop_multi_client_queue_stream(self.queue_id)
if not isinstance(index, int):
items.insert(new_index, items.pop(item_index))
await self.update(items)
+ async def delete_item(self, queue_item_id: str) -> None:
+ """Delete item (by id or index) from the queue."""
+ item_index = self.index_by_id(queue_item_id)
+ if item_index <= self._index_in_buffer:
+ # ignore request if track already loaded in the buffer
+ # the frontend should guard so this is just in case
+ return
+ self._items.pop(item_index)
+ self.signal_update(True)
+
async def load(self, queue_items: List[QueueItem], passive: bool = False) -> None:
"""Load (overwrite) queue with new items."""
for index, item in enumerate(queue_items):
+ self._items[insert_at_index:]
)
- if offset == 0:
+ if offset in (0, self._index_in_buffer):
await self.play_index(insert_at_index, passive=passive)
self.signal_update(True)
self._current_index = start_from_index
self._start_index = start_from_index
self._next_start_index = self.get_next_index(start_from_index)
+ self._index_in_buffer = start_from_index
return start_from_index
async def queue_stream_next(self, cur_index: int) -> int | None:
"""Call when queue_streamer loads next track in buffer."""
next_idx = self._next_start_index
+ self._index_in_buffer = next_idx
self._next_start_index = self.get_next_index(self._next_start_index)
return next_idx