from databases import Database as Db
from music_assistant.helpers.compare import compare_album, compare_artist
-from music_assistant.helpers.database import TABLE_ALBUMS
+from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
from music_assistant.models.enums import EventType, ProviderType
from music_assistant.models.event import MassEvent
)
return db_item
+ async def delete_db_item(self, item_id: int, db: Optional[Db] = None) -> None:
+ """Delete record from the database."""
+
+ # delete tracks connected to this album
+ async with self.mass.database.get_db(db) as db:
+ await self.mass.database.delete_where_query(
+ TABLE_TRACKS, f"albums LIKE '%\"{item_id}\"%'", db=db
+ )
+ # delete the album itself from db
+ await super().delete_db_item(item_id, db)
+
+ self.logger.debug("deleted item with id %s from database", item_id)
+
async def _match(self, db_album: Album) -> None:
"""
Try to find matching album on all providers for the provided (database) album.
from databases import Database as Db
-from music_assistant.helpers.database import TABLE_ARTISTS
+from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_ARTISTS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
from music_assistant.models.enums import EventType, ProviderType
from music_assistant.models.event import MassEvent
)
return db_item
+ async def delete_db_item(self, item_id: int, db: Optional[Db] = None) -> None:
+ """Delete record from the database."""
+
+ # delete tracks/albums connected to this artist
+ async with self.mass.database.get_db(db) as db:
+ await self.mass.database.delete_where_query(
+ TABLE_TRACKS, f"artists LIKE '%\"{item_id}\"%'", db=db
+ )
+ await self.mass.database.delete_where_query(
+ TABLE_ALBUMS, f"artists LIKE '%\"{item_id}\"%'", db=db
+ )
+ # delete the artist itself from db
+ await super().delete_db_item(item_id, db)
+
+ self.logger.debug("deleted item with id %s from database", item_id)
+
async def _match(self, db_artist: Artist, provider: MusicProvider) -> bool:
"""Try to find matching artists on given provider for the provided (database) artist."""
self.logger.debug(
username: Optional[str] = None
password: Optional[str] = None
path: Optional[str] = None
- # no need to override the id unless you really know what you're doing ;-)
+ # if id is omitted, an id is generated/derived from the other params
id: Optional[str] = None
def __post_init__(self):
import asyncio
import random
from asyncio import Task, TimerHandle
-from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
-from uuid import uuid4
-
-from mashumaro import DataClassDictMixin
from music_assistant.helpers.audio import get_stream_details
from music_assistant.models.enums import (
ContentType,
- CrossFadeMode,
EventType,
MediaType,
QueueOption,
)
from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import Radio, StreamDetails, Track
+from music_assistant.models.media_items import StreamDetails
from .player import Player, PlayerGroup, PlayerState
+from .queue_item import QueueItem
+from .queue_settings import QueueSettings
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
-@dataclass
-class QueueItem(DataClassDictMixin):
- """Representation of a queue item."""
-
- uri: str
- name: str = ""
- duration: Optional[int] = None
- item_id: str = ""
- sort_index: int = 0
- streamdetails: Optional[StreamDetails] = None
- media_type: MediaType = MediaType.UNKNOWN
- image: Optional[str] = None
- available: bool = True
- media_item: Union[Track, Radio, None] = None
-
- def __post_init__(self):
- """Set default values."""
- if not self.item_id:
- self.item_id = str(uuid4())
- if not self.name:
- self.name = self.uri
-
- @classmethod
- def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]:
- """Run actions before deserialization."""
- d.pop("streamdetails", None)
- return d
-
- def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]:
- """Run actions before serialization."""
- if self.media_type == MediaType.RADIO:
- d.pop("duration")
- return d
-
- @classmethod
- def from_media_item(cls, media_item: Track | Radio):
- """Construct QueueItem from track/radio item."""
- if isinstance(media_item, Track):
- artists = "/".join((x.name for x in media_item.artists))
- name = f"{artists} - {media_item.name}"
- else:
- name = media_item.name
- return cls(
- uri=media_item.uri,
- name=name,
- duration=media_item.duration,
- media_type=media_item.media_type,
- media_item=media_item,
- image=media_item.image,
- available=media_item.available,
- )
-
-
-class QueueSettings:
- """Representation of (user adjustable) PlayerQueue settings/preferences."""
-
- def __init__(self, queue: PlayerQueue) -> None:
- """Initialize."""
- self._queue = queue
- self.mass = queue.mass
- self._repeat_mode: RepeatMode = RepeatMode.OFF
- self._shuffle_enabled: bool = False
- self._crossfade_mode: CrossFadeMode = CrossFadeMode.DISABLED
- self._crossfade_duration: int = 6
- self._volume_normalization_enabled: bool = True
- self._volume_normalization_target: int = -23
-
- @property
- def repeat_mode(self) -> RepeatMode:
- """Return repeat enabled setting."""
- return self._repeat_mode
-
- @repeat_mode.setter
- def repeat_mode(self, enabled: bool) -> None:
- """Set repeat enabled setting."""
- if self._repeat_mode != enabled:
- self._repeat_mode = enabled
- self._on_update("repeat_mode")
-
- @property
- def shuffle_enabled(self) -> bool:
- """Return shuffle enabled setting."""
- return self._shuffle_enabled
-
- @shuffle_enabled.setter
- def shuffle_enabled(self, enabled: bool) -> None:
- """Set shuffle enabled setting."""
- if not self._shuffle_enabled and enabled:
- # shuffle requested
- self._shuffle_enabled = True
- if self._queue.current_index is not None:
- played_items = self._queue.items[: self._queue.current_index]
- next_items = self._queue.items[self._queue.current_index + 1 :]
- # for now we use default python random function
- # can be extended with some more magic based on last_played and stuff
- next_items = random.sample(next_items, len(next_items))
- items = played_items + [self._queue.current_item] + next_items
- asyncio.create_task(self._queue.update(items))
- self._on_update("shuffle_enabled")
- elif self._shuffle_enabled and not enabled:
- # unshuffle
- self._shuffle_enabled = False
- if self._queue.current_index is not None:
- played_items = self._queue.items[: self._queue.current_index]
- next_items = self._queue.items[self._queue.current_index + 1 :]
- next_items.sort(key=lambda x: x.sort_index, reverse=False)
- items = played_items + [self._queue.current_item] + next_items
- asyncio.create_task(self._queue.update(items))
- self._on_update("shuffle_enabled")
-
- @property
- def crossfade_mode(self) -> CrossFadeMode:
- """Return crossfade mode setting."""
- return self._crossfade_mode
-
- @crossfade_mode.setter
- def crossfade_mode(self, mode: CrossFadeMode) -> None:
- """Set crossfade enabled setting."""
- if self._crossfade_mode != mode:
- # TODO: restart the queue stream if its playing
- self._crossfade_mode = mode
- self._on_update("crossfade_mode")
-
- @property
- def crossfade_duration(self) -> int:
- """Return crossfade_duration setting."""
- return self._crossfade_duration
-
- @crossfade_duration.setter
- def crossfade_duration(self, duration: int) -> None:
- """Set crossfade_duration setting (1..10 seconds)."""
- duration = max(1, duration)
- duration = min(10, duration)
- if self._crossfade_duration != duration:
- self._crossfade_duration = duration
- self._on_update("crossfade_duration")
-
- @property
- def volume_normalization_enabled(self) -> bool:
- """Return volume_normalization_enabled setting."""
- return self._volume_normalization_enabled
-
- @volume_normalization_enabled.setter
- def volume_normalization_enabled(self, enabled: bool) -> None:
- """Set volume_normalization_enabled setting."""
- if self._volume_normalization_enabled != enabled:
- self._volume_normalization_enabled = enabled
- self._on_update("volume_normalization_enabled")
-
- @property
- def volume_normalization_target(self) -> float:
- """Return volume_normalization_target setting."""
- return self._volume_normalization_target
-
- @volume_normalization_target.setter
- def volume_normalization_target(self, target: float) -> None:
- """Set volume_normalization_target setting (-40..10 LUFS)."""
- target = max(-40, target)
- target = min(10, target)
- if self._volume_normalization_target != target:
- self._volume_normalization_target = target
- self._on_update("volume_normalization_target")
-
- @property
- def stream_type(self) -> ContentType:
- """Return supported/preferred stream type for playerqueue. Read only."""
- # determine default stream type from player capabilities
- return next(
- x
- for x in (
- ContentType.FLAC,
- ContentType.WAV,
- ContentType.PCM_S16LE,
- ContentType.MP3,
- ContentType.MPEG,
- )
- if x in self._queue.player.supported_content_types
- )
-
- def to_dict(self) -> Dict[str, Any]:
- """Return dict from settings."""
- return {
- "repeat_mode": self.repeat_mode.value,
- "shuffle_enabled": self.shuffle_enabled,
- "crossfade_mode": self.crossfade_mode.value,
- "crossfade_duration": self.crossfade_duration,
- "volume_normalization_enabled": self.volume_normalization_enabled,
- "volume_normalization_target": self.volume_normalization_target,
- }
-
- async def restore(self) -> None:
- """Restore state from db."""
- async with self.mass.database.get_db() as _db:
- for key, val_type in (
- ("repeat_mode", RepeatMode),
- ("crossfade_mode", CrossFadeMode),
- ("shuffle_enabled", bool),
- ("crossfade_duration", int),
- ("volume_normalization_enabled", bool),
- ("volume_normalization_target", float),
- ):
- db_key = f"{self._queue.queue_id}_{key}"
- if db_value := await self.mass.database.get_setting(db_key, db=_db):
- value = val_type(db_value["value"])
- setattr(self, f"_{key}", value)
-
- def _on_update(self, changed_key: Optional[str] = None) -> None:
- """Handle state changed."""
- self._queue.signal_update()
- self.mass.create_task(self.save(changed_key))
- # TODO: restart play if setting changed that impacts playing queue
-
- async def save(self, changed_key: Optional[str] = None) -> None:
- """Save state in db."""
- async with self.mass.database.get_db() as _db:
- for key, value in self.to_dict().items():
- if key == changed_key or changed_key is None:
- db_key = f"{self._queue.queue_id}_{key}"
- await self.mass.database.set_setting(db_key, value, db=_db)
-
-
class PlayerQueue:
"""Represents a PlayerQueue object."""
# index_in_buffer: which track is currently (pre)loaded in the streamer
self._index_in_buffer: Optional[int] = None
self._current_item_elapsed_time: int = 0
- self._last_item: Optional[QueueItem] = None
+ self._prev_item: Optional[QueueItem] = None
# start_index: from which index did the queuestream start playing
self._start_index: int = 0
self._next_start_index: int = 0
continue
queue_items.append(QueueItem.from_media_item(track))
+ # clear queue first if it was finished
+ if self._current_index >= (len(self._items) - 1):
+ self._current_index = None
+ self._items = []
+
# load items into the queue
if queue_opt == QueueOption.REPLACE:
await self.load(queue_items, passive=passive)
async def play_index(self, index: Union[int, str], passive: bool = False) -> None:
"""Play item at index (or item_id) X in queue."""
- # power on player when requesting play
- if not self.player.powered:
- await self.player.power(True)
if self.player.use_multi_stream:
await self.mass.streams.stop_multi_client_queue_stream(self.queue_id)
if not isinstance(index, int):
self.signal_update()
if self.player.state != PlayerState.PLAYING:
# handle end of queue
- if self._current_index >= (len(self._items) - 1):
+ if (self._current_index or 0) >= (len(self._items) - 1):
self._current_index += 1
self._current_item_elapsed_time = 0
# repeat enabled (of whole queue), play queue from beginning
# check if a new track is loaded, wait for the streamdetails
if (
self.current_item
- and self._last_item != self.current_item
+ and self._prev_item != self.current_item
and self.current_item.streamdetails
):
# new active item in queue
new_item_loaded = True
# invalidate previous streamdetails
- if self._last_item:
- self._last_item.streamdetails = None
- self._last_item = self.current_item
+ if self._prev_item:
+ self._prev_item.streamdetails = None
+ self._prev_item = self.current_item
# update vars and signal update on eventbus if needed
prev_item_time = int(self._current_item_elapsed_time)
self._current_item_elapsed_time = int(track_time)
--- /dev/null
+"""Model a QueueItem."""
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Any, Dict, Optional, Union
+from uuid import uuid4
+
+from mashumaro import DataClassDictMixin
+
+from music_assistant.models.enums import MediaType
+from music_assistant.models.media_items import Radio, StreamDetails, Track
+
+
+@dataclass
+class QueueItem(DataClassDictMixin):
+ """Representation of a queue item."""
+
+ uri: str
+ name: str = ""
+ duration: Optional[int] = None
+ item_id: str = ""
+ sort_index: int = 0
+ streamdetails: Optional[StreamDetails] = None
+ media_type: MediaType = MediaType.UNKNOWN
+ image: Optional[str] = None
+ available: bool = True
+ media_item: Union[Track, Radio, None] = None
+
+ def __post_init__(self):
+ """Set default values."""
+ if not self.item_id:
+ self.item_id = str(uuid4())
+ if not self.name:
+ self.name = self.uri
+
+ @classmethod
+ def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]:
+ """Run actions before deserialization."""
+ d.pop("streamdetails", None)
+ return d
+
+ def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]:
+ """Run actions before serialization."""
+ if self.media_type == MediaType.RADIO:
+ d.pop("duration")
+ return d
+
+ @classmethod
+ def from_media_item(cls, media_item: Track | Radio):
+ """Construct QueueItem from track/radio item."""
+ if isinstance(media_item, Track):
+ artists = "/".join((x.name for x in media_item.artists))
+ name = f"{artists} - {media_item.name}"
+ else:
+ name = media_item.name
+ return cls(
+ uri=media_item.uri,
+ name=name,
+ duration=media_item.duration,
+ media_type=media_item.media_type,
+ media_item=media_item,
+ image=media_item.image,
+ available=media_item.available,
+ )
--- /dev/null
+"""Model for a PlayerQueue's settings."""
+from __future__ import annotations
+
+import asyncio
+import random
+from typing import TYPE_CHECKING, Any, Dict, Optional
+
+from .enums import ContentType, CrossFadeMode, RepeatMode
+
+if TYPE_CHECKING:
+ from .player_queue import PlayerQueue
+
+
+class QueueSettings:
+ """Representation of (user adjustable) PlayerQueue settings/preferences."""
+
+ def __init__(self, queue: PlayerQueue) -> None:
+ """Initialize."""
+ self._queue = queue
+ self.mass = queue.mass
+ self._repeat_mode: RepeatMode = RepeatMode.OFF
+ self._shuffle_enabled: bool = False
+ self._crossfade_mode: CrossFadeMode = CrossFadeMode.DISABLED
+ self._crossfade_duration: int = 6
+ self._volume_normalization_enabled: bool = True
+ self._volume_normalization_target: int = -23
+
+ @property
+ def repeat_mode(self) -> RepeatMode:
+ """Return repeat enabled setting."""
+ return self._repeat_mode
+
+ @repeat_mode.setter
+ def repeat_mode(self, enabled: bool) -> None:
+ """Set repeat enabled setting."""
+ if self._repeat_mode != enabled:
+ self._repeat_mode = enabled
+ self._on_update("repeat_mode")
+
+ @property
+ def shuffle_enabled(self) -> bool:
+ """Return shuffle enabled setting."""
+ return self._shuffle_enabled
+
+ @shuffle_enabled.setter
+ def shuffle_enabled(self, enabled: bool) -> None:
+ """Set shuffle enabled setting."""
+ if not self._shuffle_enabled and enabled:
+ # shuffle requested
+ self._shuffle_enabled = True
+ if self._queue.current_index is not None:
+ played_items = self._queue.items[: self._queue.current_index]
+ next_items = self._queue.items[self._queue.current_index + 1 :]
+ # for now we use default python random function
+ # can be extended with some more magic based on last_played and stuff
+ next_items = random.sample(next_items, len(next_items))
+ items = played_items + [self._queue.current_item] + next_items
+ asyncio.create_task(self._queue.update(items))
+ self._on_update("shuffle_enabled")
+ elif self._shuffle_enabled and not enabled:
+ # unshuffle
+ self._shuffle_enabled = False
+ if self._queue.current_index is not None:
+ played_items = self._queue.items[: self._queue.current_index]
+ next_items = self._queue.items[self._queue.current_index + 1 :]
+ next_items.sort(key=lambda x: x.sort_index, reverse=False)
+ items = played_items + [self._queue.current_item] + next_items
+ asyncio.create_task(self._queue.update(items))
+ self._on_update("shuffle_enabled")
+
+ @property
+ def crossfade_mode(self) -> CrossFadeMode:
+ """Return crossfade mode setting."""
+ return self._crossfade_mode
+
+ @crossfade_mode.setter
+ def crossfade_mode(self, mode: CrossFadeMode) -> None:
+ """Set crossfade enabled setting."""
+ if self._crossfade_mode != mode:
+ # TODO: restart the queue stream if its playing
+ self._crossfade_mode = mode
+ self._on_update("crossfade_mode")
+
+ @property
+ def crossfade_duration(self) -> int:
+ """Return crossfade_duration setting."""
+ return self._crossfade_duration
+
+ @crossfade_duration.setter
+ def crossfade_duration(self, duration: int) -> None:
+ """Set crossfade_duration setting (1..10 seconds)."""
+ duration = max(1, duration)
+ duration = min(10, duration)
+ if self._crossfade_duration != duration:
+ self._crossfade_duration = duration
+ self._on_update("crossfade_duration")
+
+ @property
+ def volume_normalization_enabled(self) -> bool:
+ """Return volume_normalization_enabled setting."""
+ return self._volume_normalization_enabled
+
+ @volume_normalization_enabled.setter
+ def volume_normalization_enabled(self, enabled: bool) -> None:
+ """Set volume_normalization_enabled setting."""
+ if self._volume_normalization_enabled != enabled:
+ self._volume_normalization_enabled = enabled
+ self._on_update("volume_normalization_enabled")
+
+ @property
+ def volume_normalization_target(self) -> float:
+ """Return volume_normalization_target setting."""
+ return self._volume_normalization_target
+
+ @volume_normalization_target.setter
+ def volume_normalization_target(self, target: float) -> None:
+ """Set volume_normalization_target setting (-40..10 LUFS)."""
+ target = max(-40, target)
+ target = min(10, target)
+ if self._volume_normalization_target != target:
+ self._volume_normalization_target = target
+ self._on_update("volume_normalization_target")
+
+ @property
+ def stream_type(self) -> ContentType:
+ """Return supported/preferred stream type for playerqueue. Read only."""
+ # determine default stream type from player capabilities
+ return next(
+ x
+ for x in (
+ ContentType.FLAC,
+ ContentType.WAV,
+ ContentType.PCM_S16LE,
+ ContentType.MP3,
+ ContentType.MPEG,
+ )
+ if x in self._queue.player.supported_content_types
+ )
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Return dict from settings."""
+ return {
+ "repeat_mode": self.repeat_mode.value,
+ "shuffle_enabled": self.shuffle_enabled,
+ "crossfade_mode": self.crossfade_mode.value,
+ "crossfade_duration": self.crossfade_duration,
+ "volume_normalization_enabled": self.volume_normalization_enabled,
+ "volume_normalization_target": self.volume_normalization_target,
+ }
+
+ async def restore(self) -> None:
+ """Restore state from db."""
+ async with self.mass.database.get_db() as _db:
+ for key, val_type in (
+ ("repeat_mode", RepeatMode),
+ ("crossfade_mode", CrossFadeMode),
+ ("shuffle_enabled", bool),
+ ("crossfade_duration", int),
+ ("volume_normalization_enabled", bool),
+ ("volume_normalization_target", float),
+ ):
+ db_key = f"{self._queue.queue_id}_{key}"
+ if db_value := await self.mass.database.get_setting(db_key, db=_db):
+ value = val_type(db_value["value"])
+ setattr(self, f"_{key}", value)
+
+ def _on_update(self, changed_key: Optional[str] = None) -> None:
+ """Handle state changed."""
+ self._queue.signal_update()
+ self.mass.create_task(self.save(changed_key))
+ # TODO: restart play if setting changed that impacts playing queue
+
+ async def save(self, changed_key: Optional[str] = None) -> None:
+ """Save state in db."""
+ async with self.mass.database.get_db() as _db:
+ for key, value in self.to_dict().items():
+ if key == changed_key or changed_key is None:
+ db_key = f"{self._queue.queue_id}_{key}"
+ await self.mass.database.set_setting(db_key, value, db=_db)