from music_assistant.helpers.typing import MusicAssistant
from music_assistant.helpers.util import get_ip
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
-from music_assistant.models.media_items import ContentType
-from music_assistant.models.player_queue import PlayerQueue
+from music_assistant.models.media_items import ContentType, MediaType
+from music_assistant.models.player_queue import CrossFadeMode, PlayerQueue, QueueItem
class StreamController:
self._time_started: Dict[str, float] = {}
def get_stream_url(
- self, queue_id: str, child_player: Optional[str] = None, fmt: str = "flac"
+ self,
+ queue_id: str,
+ child_player: Optional[str] = None,
+ content_type: ContentType = ContentType.FLAC,
) -> str:
"""Return the full stream url for the PlayerQueue Stream."""
+ ext = content_type.value
if child_player:
- return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{fmt}"
- return f"http://{self._ip}:{self._port}/{queue_id}.{fmt}"
+ return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{ext}"
+ return f"http://{self._ip}:{self._port}/{queue_id}.{ext}"
async def get_preview_url(self, provider: str, track_id: str) -> str:
"""Return url to short preview sample."""
self.mass.subscribe(on_shutdown_event, EventType.SHUTDOWN)
sox_present, ffmpeg_present = await check_audio_support(True)
- if not ffmpeg_present:
+ if not ffmpeg_present and not sox_present:
self.logger.error(
+ "SoX or FFmpeg binary not found on your system, "
+ "playback will NOT work!."
+ )
+ elif not ffmpeg_present:
+ self.logger.warning(
"The FFmpeg binary was not found on your system, "
- "you might have issues with playback. "
+ "you might experience issues with playback. "
"Please install FFmpeg with your OS package manager.",
)
elif not sox_present:
self.logger.warning(
- "The SoX binary was not found on your system so FFmpeg is used as fallback. "
- "For best audio quality, please install SoX with your OS package manager.",
+ "The SoX binary was not found on your system, FFmpeg is used as fallback."
)
self.logger.info("Started stream server on port %s", self._port)
start_streamdetails = await queue.queue_stream_prepare()
output_fmt = ContentType(fmt)
+ # work out sample rate
+ if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
+ sample_rate = min(96000, queue.max_sample_rate)
+ bit_depth = 24
+ channels = 2
+ resample = True
+ else:
+ sample_rate = start_streamdetails.sample_rate
+ bit_depth = start_streamdetails.bit_depth
+ channels = start_streamdetails.channels
+ resample = False
sox_args = await get_sox_args_for_pcm_stream(
- start_streamdetails.sample_rate,
- start_streamdetails.bit_depth,
- start_streamdetails.channels,
+ sample_rate,
+ bit_depth,
+ channels,
output_format=output_fmt,
)
- # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format
+ # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
# send the compressed/endoded stream to the client.
async with AsyncProcess(sox_args, True) as sox_proc:
sample_rate=start_streamdetails.sample_rate,
bit_depth=start_streamdetails.bit_depth,
channels=start_streamdetails.channels,
+ resample=resample,
):
if sox_proc.closed:
return
return resp
async def serve_multi_client_queue_stream(self, request: web.Request):
- """Serve queue audio stream to multiple (group)clients in the raw PCM format."""
+ """Serve queue audio stream to multiple (group)clients."""
queue_id = request.match_info["queue_id"]
player_id = request.match_info["player_id"]
fmt = request.match_info.get("format", "flac")
queue = self.mass.players.get_player_queue(queue_id)
start_streamdetails = await queue.queue_stream_prepare()
+ # work out sample rate
+ if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
+ sample_rate = min(96000, queue.max_sample_rate)
+ bit_depth = 24
+ channels = 2
+ resample = True
+ else:
+ sample_rate = start_streamdetails.sample_rate
+ bit_depth = start_streamdetails.bit_depth
+ channels = start_streamdetails.channels
+ resample = False
sox_args = await get_sox_args_for_pcm_stream(
- start_streamdetails.sample_rate,
- start_streamdetails.bit_depth,
- start_streamdetails.channels,
+ sample_rate,
+ bit_depth,
+ channels,
output_format=output_fmt,
)
self.logger.debug("Multi client queue stream %s started", queue.queue_id)
try:
- # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format
+ # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
# send the compressed/endoded stream to the client.
async with AsyncProcess(sox_args, True) as sox_proc:
sample_rate=start_streamdetails.sample_rate,
bit_depth=start_streamdetails.bit_depth,
channels=start_streamdetails.channels,
+ resample=resample,
):
if sox_proc.closed:
return
last_fadeout_data = b""
queue_index = None
track_count = 0
+ prev_track: Optional[QueueItem] = None
pcm_fmt = ContentType.from_bit_depth(bit_depth)
self.logger.info(
if not resample and streamdetails.bit_depth > bit_depth:
await queue.queue_stream_signal_next()
self.logger.info("Abort queue stream due to bit depth mismatch")
- await queue.queue_stream_signal_next()
break
if (
not resample
await queue.queue_stream_signal_next()
break
+ # check crossfade ability
+ use_crossfade = queue.settings.crossfade_mode != CrossFadeMode.DISABLED
+ if (
+ prev_track is not None
+ and prev_track.media_type == MediaType.TRACK
+ and queue_track.media_type == MediaType.TRACK
+ ):
+ prev_item = await self.mass.music.get_item_by_uri(prev_track.uri)
+ new_item = await self.mass.music.get_item_by_uri(queue_track.uri)
+ if (
+ prev_item.album is not None
+ and new_item.album is not None
+ and prev_item.album == new_item.album
+ ):
+ use_crossfade = False
+ prev_track = queue_track
+
sample_size = int(sample_rate * (bit_depth / 8) * channels) # 1 second
- buffer_size = sample_size * (
- queue.crossfade_duration or 1
- ) # 1...10 seconds
+ buffer_size = sample_size * (queue.settings.crossfade_duration or 2)
self.logger.debug(
"Start Streaming queue track: %s (%s) for queue %s",
# part is too short after the strip action
# so we just use the entire original data
last_part = prev_chunk + chunk
- if not queue.crossfade_duration or len(last_part) < buffer_size:
+ if not use_crossfade or len(last_part) < buffer_size:
# crossfading is not enabled or not enough data,
# so just pass the (stripped) audio data
- if queue.crossfade_duration:
+ if use_crossfade:
self.logger.warning(
"Not enough data for crossfade: %s", len(last_part)
)
# pylint: disable=invalid-name
-SCHEMA_VERSION = 4
+SCHEMA_VERSION = 5
TABLE_PROV_MAPPINGS = "provider_mappings"
TABLE_TRACK_LOUDNESS = "track_loudness"
TABLE_PLAYLISTS = "playlists"
TABLE_RADIOS = "radios"
TABLE_CACHE = "cache"
+TABLE_SETTINGS = "settings"
class Database:
async with Db(self.url, timeout=360) as _db:
yield _db
+ async def get_setting(self, key: str, db: Optional[Db] = None) -> str | None:
+ """Get setting from settings table."""
+ return await self.get_row(TABLE_SETTINGS, {"key": key}, db=db)
+
+ async def set_setting(self, key: str, value: str, db: Optional[Db] = None) -> None:
+ """Set setting in settings table."""
+ if not isinstance(value, str):
+ value = str(value)
+ return await self.insert_or_replace(
+ TABLE_SETTINGS, {"key": key, "value": value}
+ )
+
async def get_rows(
self,
table: str,
async def _migrate(self):
"""Perform database migration actions if needed."""
- prev_version = await self.get_row("settings", {"key": "version"})
- if prev_version:
- prev_version = int(prev_version["value"])
- else:
- prev_version = 0
- if SCHEMA_VERSION != prev_version:
- self.logger.info(
- "Performing database migration from %s to %s",
- prev_version,
- SCHEMA_VERSION,
- )
+ async with self.get_db() as db:
+ if await self.exists(TABLE_SETTINGS, db):
+ prev_version = await self.get_setting("version", db)
+ prev_version = int(prev_version["value"])
+ else:
+ prev_version = 0
- if prev_version < 3:
- # schema version 3: too many breaking changes, rebuild db
- async with self.get_db() as _db:
- await _db.execute(f"DROP TABLE IF EXISTS {TABLE_ARTISTS}")
- await _db.execute(f"DROP TABLE IF EXISTS {TABLE_ALBUMS}")
- await _db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}")
- await _db.execute(f"DROP TABLE IF EXISTS {TABLE_PLAYLISTS}")
- await _db.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}")
- await _db.execute(f"DROP TABLE IF EXISTS {TABLE_PROV_MAPPINGS}")
- await _db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}")
+ if SCHEMA_VERSION != prev_version:
+ self.logger.info(
+ "Performing database migration from %s to %s",
+ prev_version,
+ SCHEMA_VERSION,
+ )
- if prev_version < 4:
- # schema version 4: add album to tracks table
- async with self.get_db() as _db:
- await _db.execute("DROP TABLE IF EXISTS tracks")
- if await self.exists(TABLE_PROV_MAPPINGS, _db):
+ if prev_version < 3:
+ # schema version 3: too many breaking changes, rebuild db
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_ARTISTS}")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_ALBUMS}")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_PLAYLISTS}")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_PROV_MAPPINGS}")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}")
+
+ if prev_version < 4:
+ # schema version 4: add album to tracks table
+ await db.execute("DROP TABLE IF EXISTS tracks")
+ if await self.exists(TABLE_PROV_MAPPINGS, db):
await self.delete(
- TABLE_PROV_MAPPINGS, {"media_type": "track"}, db=_db
+ TABLE_PROV_MAPPINGS, {"media_type": "track"}, db=db
)
- # create db tables
- await self.__create_database_tables()
- # store current schema version
- await self.insert_or_replace(
- "settings", {"key": "version", "value": str(SCHEMA_VERSION)}
- )
+ if prev_version < 5:
+ # delete player_settings table: use generic settings table instead
+ await db.execute("DROP TABLE IF EXISTS queue_settings")
+
+ # create db tables
+ await self.__create_database_tables(db)
+ # store current schema version
+ await self.set_setting("version", str(SCHEMA_VERSION), db=db)
- async def __create_database_tables(self) -> None:
+ @staticmethod
+ async def __create_database_tables(db: Db) -> None:
"""Init generic database tables."""
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_PROV_MAPPINGS}(
- item_id INTEGER NOT NULL,
- media_type TEXT NOT NULL,
- prov_item_id TEXT NOT NULL,
- provider TEXT NOT NULL,
- quality INTEGER NULL,
- details TEXT NULL,
- url TEXT NULL,
- UNIQUE(item_id, media_type, prov_item_id, provider)
- );"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACK_LOUDNESS}(
- item_id INTEGER NOT NULL,
- provider TEXT NOT NULL,
- loudness REAL,
- UNIQUE(item_id, provider));"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLOG}(
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_PROV_MAPPINGS}(
item_id INTEGER NOT NULL,
+ media_type TEXT NOT NULL,
+ prov_item_id TEXT NOT NULL,
provider TEXT NOT NULL,
- timestamp REAL,
- UNIQUE(item_id, provider));"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_ALBUMS}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- sort_name TEXT NOT NULL,
- album_type TEXT,
- year INTEGER,
- version TEXT,
- in_library BOOLEAN DEFAULT 0,
- upc TEXT,
- musicbrainz_id TEXT,
- artist json,
- metadata json,
- provider_ids json
- );"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_ARTISTS}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- sort_name TEXT NOT NULL,
- musicbrainz_id TEXT NOT NULL UNIQUE,
- in_library BOOLEAN DEFAULT 0,
- metadata json,
- provider_ids json
- );"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACKS}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- sort_name TEXT NOT NULL,
- version TEXT,
- duration INTEGER,
- in_library BOOLEAN DEFAULT 0,
- isrc TEXT,
- musicbrainz_id TEXT,
- artists json,
- album json,
- metadata json,
- provider_ids json
- );"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLISTS}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- sort_name TEXT NOT NULL,
- owner TEXT NOT NULL,
- is_editable BOOLEAN NOT NULL,
- checksum TEXT NOT NULL,
- in_library BOOLEAN DEFAULT 0,
- metadata json,
- provider_ids json,
- UNIQUE(name, owner)
+ quality INTEGER NULL,
+ details TEXT NULL,
+ url TEXT NULL,
+ UNIQUE(item_id, media_type, prov_item_id, provider)
);"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_RADIOS}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL UNIQUE,
- sort_name TEXT NOT NULL,
- in_library BOOLEAN DEFAULT 0,
- metadata json,
- provider_ids json
+ )
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACK_LOUDNESS}(
+ item_id INTEGER NOT NULL,
+ provider TEXT NOT NULL,
+ loudness REAL,
+ UNIQUE(item_id, provider));"""
+ )
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLOG}(
+ item_id INTEGER NOT NULL,
+ provider TEXT NOT NULL,
+ timestamp REAL,
+ UNIQUE(item_id, provider));"""
+ )
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_ALBUMS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL,
+ sort_name TEXT NOT NULL,
+ album_type TEXT,
+ year INTEGER,
+ version TEXT,
+ in_library BOOLEAN DEFAULT 0,
+ upc TEXT,
+ musicbrainz_id TEXT,
+ artist json,
+ metadata json,
+ provider_ids json
+ );"""
+ )
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_ARTISTS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL,
+ sort_name TEXT NOT NULL,
+ musicbrainz_id TEXT NOT NULL UNIQUE,
+ in_library BOOLEAN DEFAULT 0,
+ metadata json,
+ provider_ids json
);"""
- )
+ )
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACKS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL,
+ sort_name TEXT NOT NULL,
+ version TEXT,
+ duration INTEGER,
+ in_library BOOLEAN DEFAULT 0,
+ isrc TEXT,
+ musicbrainz_id TEXT,
+ artists json,
+ album json,
+ metadata json,
+ provider_ids json
+ );"""
+ )
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLISTS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL,
+ sort_name TEXT NOT NULL,
+ owner TEXT NOT NULL,
+ is_editable BOOLEAN NOT NULL,
+ checksum TEXT NOT NULL,
+ in_library BOOLEAN DEFAULT 0,
+ metadata json,
+ provider_ids json,
+ UNIQUE(name, owner)
+ );"""
+ )
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_RADIOS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL UNIQUE,
+ sort_name TEXT NOT NULL,
+ in_library BOOLEAN DEFAULT 0,
+ metadata json,
+ provider_ids json
+ );"""
+ )
)
+class CrossFadeMode(Enum):
+ """Enum with crossfade modes."""
+
+ DISABLED = "disabled" # no crossfading at all
+ STRICT = "strict" # do not crossfade tracks of same album
+ SMART = "smart" # crossfade if possible (do not crossfade different sample rates)
+ ALWAYS = "always" # all tracks - resample to fixed sample rate
+
+
+class RepeatMode(Enum):
+ """Enum with repeat modes."""
+
+ DISABLED = "disabled" # no repeat at all
+ SINGLE = "single" # repeat current/single track
+ ALL = "all" # repeat entire queue
+
+
+class QueueSettings:
+ """Representation of (user adjustable) PlayerQueue settings/preferences."""
+
+ def __init__(self, queue: PlayerQueue) -> None:
+ """Initialize."""
+ self._queue = queue
+ self.mass = queue.mass
+ self._repeat_mode: RepeatMode = RepeatMode.DISABLED
+ self._shuffle_enabled: bool = False
+ self._crossfade_mode: CrossFadeMode = CrossFadeMode.DISABLED
+ self._crossfade_duration: int = 6
+ self._volume_normalization_enabled: bool = True
+ self._volume_normalization_target: int = -23
+
+ @property
+ def repeat_mode(self) -> RepeatMode:
+ """Return repeat enabled setting."""
+ return self._repeat_mode
+
+ @repeat_mode.setter
+ def repeat_mode(self, enabled: bool) -> None:
+ """Set repeat enabled setting."""
+ if self._repeat_mode != enabled:
+ self._repeat_mode = enabled
+ self._on_update("repeat_mode")
+
+ @property
+ def shuffle_enabled(self) -> bool:
+ """Return shuffle enabled setting."""
+ return self._shuffle_enabled
+
+ @shuffle_enabled.setter
+ def shuffle_enabled(self, enabled: bool) -> None:
+ """Set shuffle enabled setting."""
+ if not self._shuffle_enabled and enabled:
+ # shuffle requested
+ self._shuffle_enabled = True
+ if self._queue.current_index is not None:
+ played_items = self._queue.items[: self._queue.current_index]
+ next_items = self._queue.items[self._queue.current_index + 1 :]
+ # for now we use default python random function
+ # can be extended with some more magic based on last_played and stuff
+ next_items = random.sample(next_items, len(next_items))
+ items = played_items + [self._queue.current_item] + next_items
+ asyncio.create_task(self._queue.update(items))
+ self._on_update("shuffle_enabled")
+ elif self._shuffle_enabled and not enabled:
+ # unshuffle
+ self._shuffle_enabled = False
+ if self._queue.current_index is not None:
+ played_items = self._queue.items[: self._queue.current_index]
+ next_items = self._queue.items[self._queue.current_index + 1 :]
+ next_items.sort(key=lambda x: x.sort_index, reverse=False)
+ items = played_items + [self._queue.current_item] + next_items
+ asyncio.create_task(self._queue.update(items))
+ self._on_update("shuffle_enabled")
+
+ @property
+ def crossfade_mode(self) -> CrossFadeMode:
+ """Return crossfade mode setting."""
+ return self._crossfade_mode
+
+ @crossfade_mode.setter
+ def crossfade_mode(self, mode: CrossFadeMode) -> None:
+ """Set crossfade enabled setting."""
+ if self._crossfade_mode != mode:
+ # TODO: restart the queue stream if its playing
+ self._crossfade_mode = mode
+ self._on_update("crossfade_mode")
+
+ @property
+ def crossfade_duration(self) -> int:
+ """Return crossfade_duration setting."""
+ return self._crossfade_duration
+
+ @crossfade_duration.setter
+ def crossfade_duration(self, duration: int) -> None:
+ """Set crossfade_duration setting (1..10 seconds)."""
+ duration = max(1, duration)
+ duration = min(10, duration)
+ if self._crossfade_duration != duration:
+ self._crossfade_duration = duration
+ self._on_update("crossfade_duration")
+
+ @property
+ def volume_normalization_enabled(self) -> bool:
+ """Return volume_normalization_enabled setting."""
+ return self._volume_normalization_enabled
+
+ @volume_normalization_enabled.setter
+ def volume_normalization_enabled(self, enabled: bool) -> None:
+ """Set volume_normalization_enabled setting."""
+ if self._volume_normalization_enabled != enabled:
+ self._volume_normalization_enabled = enabled
+ self._on_update("volume_normalization_enabled")
+ self.save()
+
+ @property
+ def volume_normalization_target(self) -> float:
+ """Return volume_normalization_target setting."""
+ return self._volume_normalization_target
+
+ @volume_normalization_target.setter
+ def volume_normalization_target(self, target: float) -> None:
+ """Set volume_normalization_target setting (-40..10 LUFS)."""
+ target = max(-40, target)
+ target = min(10, target)
+ if self._volume_normalization_target != target:
+ self._volume_normalization_target = target
+ self._on_update("volume_normalization_target")
+
+ @property
+ def stream_type(self) -> ContentType:
+ """Return supported/preferred stream type for playerqueue. Read only."""
+ # determine default stream type from player capabilities
+ return next(
+ x
+ for x in (
+ ContentType.FLAC,
+ ContentType.WAV,
+ ContentType.PCM_S16LE,
+ ContentType.MP3,
+ ContentType.MPEG,
+ )
+ if x in self._queue.player.supported_content_types
+ )
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Return dict from settings."""
+ return {
+ "repeat_mode": self.repeat_mode.value,
+ "shuffle_enabled": self.shuffle_enabled,
+ "crossfade_mode": self.crossfade_mode,
+ "crossfade_duration": self.crossfade_duration,
+ "volume_normalization_enabled": self.volume_normalization_enabled,
+ "volume_normalization_target": self.volume_normalization_target,
+ }
+
+ async def restore(self) -> None:
+ """Restore state from db."""
+ async with self.mass.database.get_db() as _db:
+ for key, val_type in (
+ ("repeat_mode", RepeatMode),
+ ("crossfade_mode", CrossFadeMode),
+ ("shuffle_enabled", bool),
+ ("crossfade_duration", int),
+ ("volume_normalization_enabled", bool),
+ ("volume_normalization_target", float),
+ ):
+ db_key = f"{self._queue.queue_id}_{key}"
+ if db_value := await self.mass.database.get_setting(db_key, db=_db):
+ value = val_type(db_value["value"])
+ setattr(self, f"_{key}", value)
+
+ def _on_update(self, changed_key: Optional[str] = None) -> None:
+ """Handle state changed."""
+ self._queue.signal_update()
+ self.mass.create_task(self.save(changed_key))
+ # TODO: restart play if setting changed that impacts playing queue
+
+ async def save(self, changed_key: Optional[str] = None) -> None:
+ """Save state in db."""
+ async with self.mass.database.get_db() as _db:
+ for key, value in self.to_dict().items():
+ if key == changed_key or changed_key is None:
+ db_key = f"{self._queue.queue_id}_{key}"
+ await self.mass.database.set_setting(db_key, value, db=_db)
+
+
class PlayerQueue:
"""Represents a PlayerQueue object."""
self.mass = mass
self.logger = mass.players.logger
self.queue_id = player_id
-
- self._shuffle_enabled: bool = False
- self._repeat_enabled: bool = False
- self._crossfade_duration: int = 0
- self._volume_normalization_enabled: bool = True
- self._volume_normalization_target: int = -23
-
+ self._settings = QueueSettings(self)
self._current_index: Optional[int] = None
self._current_item_elapsed_time: int = 0
self._last_item: Optional[QueueItem] = None
self._update_task: Task = None
self._signal_next: bool = False
self._last_player_update: int = 0
- self._stream_url: str = self.mass.streams.get_stream_url(self.queue_id)
+ self._stream_url: str = ""
async def setup(self) -> None:
"""Handle async setup of instance."""
- await self._restore_saved_state()
+ await self._settings.restore()
+ await self._restore_items()
+ self._stream_url: str = self.mass.streams.get_stream_url(
+ self.queue_id, content_type=self._settings.stream_type
+ )
self.mass.signal_event(
MassEvent(EventType.QUEUE_ADDED, object_id=self.queue_id, data=self)
)
+ @property
+ def settings(self) -> QueueSettings:
+ """Return settings/preferences for this PlayerQueue."""
+ return self._settings
+
@property
def player(self) -> Player | PlayerGroup:
"""Return the player attached to this queue."""
return self.player.elapsed_time
return self._current_item_elapsed_time
- @property
- def repeat_enabled(self) -> bool:
- """Return if repeat is enabled."""
- return self._repeat_enabled
-
- @property
- def shuffle_enabled(self) -> bool:
- """Return if shuffle is enabled."""
- return self._shuffle_enabled
-
- @property
- def crossfade_duration(self) -> int:
- """Return crossfade duration (0 if disabled)."""
- return self._crossfade_duration
-
@property
def max_sample_rate(self) -> int:
- """Return the maximum supported sample rate this playerqueue supports."""
- if self.player.max_sample_rate is None:
- return 96000
- return self.player.max_sample_rate
+ """Return the maximum samplerate supported by this queue(player)."""
+ return max(self.player.supported_sample_rates)
@property
def items(self) -> List[QueueItem]:
return self._items[next_index]
return None
- @property
- def volume_normalization_enabled(self) -> bool:
- """Return bool if volume normalization is enabled for this queue."""
- return self._volume_normalization_enabled
-
- @property
- def volume_normalization_target(self) -> int:
- """Return volume target (in LUFS) for volume normalization for this queue."""
- return self._volume_normalization_target
-
def get_item(self, index: int) -> QueueItem | None:
"""Get queue item by index."""
if index is not None and len(self._items) > index:
if queue_opt == QueueOption.ADD:
return await self.append(queue_items)
- async def set_shuffle_enabled(self, enable_shuffle: bool) -> None:
- """Set shuffle."""
- if not self._shuffle_enabled and enable_shuffle:
- # shuffle requested
- self._shuffle_enabled = True
- if self._current_index is not None:
- played_items = self.items[: self._current_index]
- next_items = self.__shuffle_items(self.items[self._current_index + 1 :])
- items = played_items + [self.current_item] + next_items
- self.mass.signal_event(
- MassEvent(
- EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self
- )
- )
- await self.update(items)
- elif self._shuffle_enabled and not enable_shuffle:
- # unshuffle
- self._shuffle_enabled = False
- if self._current_index is not None:
- played_items = self.items[: self._current_index]
- next_items = self.items[self._current_index + 1 :]
- next_items.sort(key=lambda x: x.sort_index, reverse=False)
- items = played_items + [self.current_item] + next_items
- self.mass.signal_event(
- MassEvent(
- EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self
- )
- )
- await self.update(items)
-
- async def set_repeat_enabled(self, enable_repeat: bool) -> None:
- """Set the repeat mode for this queue."""
- if self._repeat_enabled != enable_repeat:
- self._repeat_enabled = enable_repeat
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
- )
- await self._save_state(False)
-
- async def set_crossfade_duration(self, duration: int) -> None:
- """Set the crossfade duration for this queue, 0 to disable."""
- duration = max(duration, 10)
- if self._crossfade_duration != duration:
- self._crossfade_duration = duration
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
- )
- await self._save_state(False)
-
- async def set_volume_normalization_enabled(self, enable: bool) -> None:
- """Set volume normalization."""
- if self._repeat_enabled != enable:
- self._repeat_enabled = enable
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
- )
- await self._save_state(False)
-
- async def set_volume_normalization_target(self, target: int) -> None:
- """Set the target for the volume normalization in LUFS (default is -23)."""
- target = min(target, 0)
- target = max(target, -40)
- if self._volume_normalization_target != target:
- self._volume_normalization_target = target
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
- )
- await self._save_state(False)
-
async def stop(self) -> None:
"""Stop command on queue player."""
# redirect to underlying player
self._current_index = index
self._next_start_index = index
# send stream url to player connected to this queue
- self._stream_url = self.mass.streams.get_stream_url(self.queue_id)
+ self._stream_url = self.mass.streams.get_stream_url(
+ self.queue_id, content_type=self._settings.stream_type
+ )
if self.player.use_multi_stream:
# multi stream enabled, all child players should receive the same audio stream
# redirect command to all (powered) players
+ # TODO: this assumes that all client players support flac
+ content_type = ContentType.FLAC
coros = []
expected_clients = set()
for child_id in self.player.group_childs:
if child_player := self.mass.players.get_player(child_id):
if child_player.powered:
+ # TODO: this assumes that all client players support flac
player_url = self.mass.streams.get_stream_url(
- self.queue_id, child_id
+ self.queue_id, child_id, content_type
)
expected_clients.add(child_id)
coros.append(child_player.play_url(player_url))
await self.mass.streams.start_multi_client_queue_stream(
- self.queue_id, expected_clients, ContentType.FLAC
+ # TODO: this assumes that all client players support flac
+ self.queue_id,
+ expected_clients,
+ content_type,
)
await asyncio.gather(*coros)
else:
"""Load (overwrite) queue with new items."""
for index, item in enumerate(queue_items):
item.sort_index = index
- if self._shuffle_enabled and len(queue_items) > 5:
- queue_items = self.__shuffle_items(queue_items)
+ if self.settings.shuffle_enabled and len(queue_items) > 5:
+ queue_items = random.sample(queue_items, len(queue_items))
self._items = queue_items
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
- )
await self.play_index(0)
- await self._save_state()
+ self.signal_update(True)
async def insert(self, queue_items: List[QueueItem], offset: int = 0) -> None:
"""
insert_at_index = self._current_index + offset
for index, item in enumerate(queue_items):
item.sort_index = insert_at_index + index
- if self.shuffle_enabled and len(queue_items) > 5:
- queue_items = self.__shuffle_items(queue_items)
+ if self.settings.shuffle_enabled and len(queue_items) > 5:
+ queue_items = random.sample(queue_items, len(queue_items))
if offset == 0:
# replace current item with new
self._items = (
if offset == 0:
await self.play_index(insert_at_index)
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
- )
- await self._save_state()
+ self.signal_update(True)
async def append(self, queue_items: List[QueueItem]) -> None:
"""Append new items at the end of the queue."""
for index, item in enumerate(queue_items):
item.sort_index = len(self.items) + index
- if self.shuffle_enabled:
+ if self.settings.shuffle_enabled:
played_items = self.items[: self._current_index]
next_items = self.items[self._current_index + 1 :] + queue_items
- next_items = self.__shuffle_items(next_items)
+ next_items = random.sample(next_items, len(next_items))
items = played_items + [self.current_item] + next_items
await self.update(items)
return
self._items = self._items + queue_items
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
- )
- await self._save_state()
+ self.signal_update(True)
async def update(self, queue_items: List[QueueItem]) -> None:
"""Update the existing queue items, mostly caused by reordering."""
self._items = queue_items
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
- )
- await self._save_state()
+ self.signal_update(True)
async def clear(self) -> None:
"""Clear all items in the queue."""
self._current_item_elapsed_time = int(track_time)
if new_item_loaded:
- self.mass.create_task(self._save_state())
- self.mass.signal_event(
- MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
- )
+ self.signal_update()
if abs(prev_item_time - self._current_item_elapsed_time) >= 1:
self.mass.signal_event(
MassEvent(
if index is None:
# guard just in case
return 0
+ if self.settings.repeat_mode == RepeatMode.SINGLE:
+ return index
if len(self._items) > (index + 1):
return index + 1
- if self.repeat_enabled:
+ if self.settings.repeat_mode == RepeatMode.ALL:
# repeat enabled, start queue at beginning
return 0
return None
"""Indicate that queue stream needs to start next index once playback finished."""
self._signal_next = True
+ def signal_update(self, items_changed: bool = False) -> None:
+ """Signal state changed of this queue."""
+ if items_changed:
+ self.mass.create_task(self._save_items())
+ self.mass.signal_event(
+ MassEvent(
+ EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self
+ )
+ )
+ else:
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+ )
+
def to_dict(self) -> Dict[str, Any]:
"""Export object to dict."""
cur_item = self.current_item.to_dict() if self.current_item else None
"current_index": self.current_index,
"current_item": cur_item,
"next_item": next_item,
- "shuffle_enabled": self.shuffle_enabled,
- "repeat_enabled": self.repeat_enabled,
- "volume_normalization_enabled": self.volume_normalization_enabled,
- "volume_normalization_target": self.volume_normalization_target,
- "crossfade_duration": self.crossfade_duration,
+ "settings": self.settings.to_dict(),
}
def __get_queue_stream_index(self) -> Tuple[int, int]:
break
return queue_index, track_time
- @staticmethod
- def __shuffle_items(queue_items: List[QueueItem]) -> List[QueueItem]:
- """Shuffle a list of tracks."""
- # for now we use default python random function
- # can be extended with some more magic based on last_played and stuff
- return random.sample(queue_items, len(queue_items))
-
- async def _restore_saved_state(self) -> None:
- """Try to load the saved state from database."""
- if db_row := await self.mass.database.get_row(
- "queue_settings", {"queue_id": self.queue_id}
- ):
- self._shuffle_enabled = bool(db_row["shuffle_enabled"])
- self._repeat_enabled = bool(db_row["repeat_enabled"])
- self._crossfade_duration = db_row["crossfade_duration"]
+ async def _restore_items(self) -> None:
+ """Try to load the saved state from cache."""
if queue_cache := await self.mass.cache.get(f"queue_items.{self.queue_id}"):
- self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]]
- self._current_index = queue_cache["current_index"]
-
- async def _save_state(self, save_items: bool = True) -> None:
- """Save state in database."""
- # save queue settings in db
- await self.mass.database.insert_or_replace(
- "queue_settings",
+ try:
+ self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]]
+ self._current_index = queue_cache["current_index"]
+ except (KeyError, AttributeError, TypeError) as err:
+ self.logger.warning(
+ "Unable to restore queue state for queue %s",
+ self.queue_id,
+ exc_info=err,
+ )
+ await self.settings.restore()
+
+ async def _save_items(self) -> None:
+ """Save current queue items/state in cache."""
+ await self.mass.cache.set(
+ f"queue_items.{self.queue_id}",
{
- "queue_id": self.queue_id,
- "shuffle_enabled": self._shuffle_enabled,
- "repeat_enabled": self.repeat_enabled,
- "crossfade_duration": self._crossfade_duration,
- "volume_normalization_enabled": self._volume_normalization_enabled,
- "volume_normalization_target": self._volume_normalization_target,
+ "items": [x.to_dict() for x in self._items],
+ "current_index": self._current_index,
},
)
-
- # store current items in cache
- if save_items:
- await self.mass.cache.set(
- f"queue_items.{self.queue_id}",
- {
- "items": [x.to_dict() for x in self._items],
- "current_index": self._current_index,
- },
- )