From: Marcel van der Veldt Date: Sat, 7 May 2022 09:02:15 +0000 (+0200) Subject: Refactor queue settings (#281) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=4d2690f1fd662707b1f7df4ced6a48e27d42cb19;p=music-assistant-server.git Refactor queue settings (#281) - easier save/restore of player preferences - future proofing - handle stream transcoding based on player capabilities --- diff --git a/examples/full.py b/examples/full.py index a2e7a166..ef9e1dc6 100644 --- a/examples/full.py +++ b/examples/full.py @@ -6,6 +6,7 @@ import os from music_assistant.mass import MusicAssistant from music_assistant.models.player import Player, PlayerState +from music_assistant.models.player_queue import RepeatMode from music_assistant.providers.filesystem import FileSystemProvider from music_assistant.providers.qobuz import QobuzProvider from music_assistant.providers.spotify import SpotifyProvider @@ -168,8 +169,8 @@ async def main(): await mass.players.register_player(test_player1) await mass.players.register_player(test_player2) # try to play some playlist - await test_player1.active_queue.set_crossfade_duration(10) - await test_player1.active_queue.set_shuffle_enabled(True) + test_player1.active_queue.settings.shuffle_enabled = True + test_player1.active_queue.settings.repeat_mode = RepeatMode.ALL if len(playlists) > 0: await test_player1.active_queue.play_media(playlists[0].uri) diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index 07b4f489..53b83014 100755 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -26,16 +26,7 @@ class PlayerController: async def setup(self) -> None: """Async initialize of module.""" - async with self.mass.database.get_db() as _db: - await _db.execute( - """CREATE TABLE IF NOT EXISTS queue_settings( - queue_id TEXT UNIQUE, - crossfade_duration INTEGER, - shuffle_enabled BOOLEAN, - repeat_enabled BOOLEAN, - volume_normalization_enabled BOOLEAN, - volume_normalization_target INTEGER)""" - ) + # nothing to setup (yet) async def cleanup(self) -> None: """Cleanup on exit.""" diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index 043c1dae..58234d96 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -22,8 +22,8 @@ from music_assistant.helpers.process import AsyncProcess from music_assistant.helpers.typing import MusicAssistant from music_assistant.helpers.util import get_ip from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError -from music_assistant.models.media_items import ContentType -from music_assistant.models.player_queue import PlayerQueue +from music_assistant.models.media_items import ContentType, MediaType +from music_assistant.models.player_queue import CrossFadeMode, PlayerQueue, QueueItem class StreamController: @@ -41,12 +41,16 @@ class StreamController: self._time_started: Dict[str, float] = {} def get_stream_url( - self, queue_id: str, child_player: Optional[str] = None, fmt: str = "flac" + self, + queue_id: str, + child_player: Optional[str] = None, + content_type: ContentType = ContentType.FLAC, ) -> str: """Return the full stream url for the PlayerQueue Stream.""" + ext = content_type.value if child_player: - return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{fmt}" - return f"http://{self._ip}:{self._port}/{queue_id}.{fmt}" + return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{ext}" + return f"http://{self._ip}:{self._port}/{queue_id}.{ext}" async def get_preview_url(self, provider: str, track_id: str) -> str: """Return url to short preview sample.""" @@ -84,16 +88,20 @@ class StreamController: self.mass.subscribe(on_shutdown_event, EventType.SHUTDOWN) sox_present, ffmpeg_present = await check_audio_support(True) - if not ffmpeg_present: + if not ffmpeg_present and not sox_present: self.logger.error( + "SoX or FFmpeg binary not found on your system, " + "playback will NOT work!." + ) + elif not ffmpeg_present: + self.logger.warning( "The FFmpeg binary was not found on your system, " - "you might have issues with playback. " + "you might experience issues with playback. " "Please install FFmpeg with your OS package manager.", ) elif not sox_present: self.logger.warning( - "The SoX binary was not found on your system so FFmpeg is used as fallback. " - "For best audio quality, please install SoX with your OS package manager.", + "The SoX binary was not found on your system, FFmpeg is used as fallback." ) self.logger.info("Started stream server on port %s", self._port) @@ -127,13 +135,24 @@ class StreamController: start_streamdetails = await queue.queue_stream_prepare() output_fmt = ContentType(fmt) + # work out sample rate + if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS: + sample_rate = min(96000, queue.max_sample_rate) + bit_depth = 24 + channels = 2 + resample = True + else: + sample_rate = start_streamdetails.sample_rate + bit_depth = start_streamdetails.bit_depth + channels = start_streamdetails.channels + resample = False sox_args = await get_sox_args_for_pcm_stream( - start_streamdetails.sample_rate, - start_streamdetails.bit_depth, - start_streamdetails.channels, + sample_rate, + bit_depth, + channels, output_format=output_fmt, ) - # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format + # get the raw pcm bytes from the queue stream and on the fly encode to wanted format # send the compressed/endoded stream to the client. async with AsyncProcess(sox_args, True) as sox_proc: @@ -144,6 +163,7 @@ class StreamController: sample_rate=start_streamdetails.sample_rate, bit_depth=start_streamdetails.bit_depth, channels=start_streamdetails.channels, + resample=resample, ): if sox_proc.closed: return @@ -160,7 +180,7 @@ class StreamController: return resp async def serve_multi_client_queue_stream(self, request: web.Request): - """Serve queue audio stream to multiple (group)clients in the raw PCM format.""" + """Serve queue audio stream to multiple (group)clients.""" queue_id = request.match_info["queue_id"] player_id = request.match_info["player_id"] fmt = request.match_info.get("format", "flac") @@ -274,16 +294,27 @@ class StreamController: queue = self.mass.players.get_player_queue(queue_id) start_streamdetails = await queue.queue_stream_prepare() + # work out sample rate + if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS: + sample_rate = min(96000, queue.max_sample_rate) + bit_depth = 24 + channels = 2 + resample = True + else: + sample_rate = start_streamdetails.sample_rate + bit_depth = start_streamdetails.bit_depth + channels = start_streamdetails.channels + resample = False sox_args = await get_sox_args_for_pcm_stream( - start_streamdetails.sample_rate, - start_streamdetails.bit_depth, - start_streamdetails.channels, + sample_rate, + bit_depth, + channels, output_format=output_fmt, ) self.logger.debug("Multi client queue stream %s started", queue.queue_id) try: - # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format + # get the raw pcm bytes from the queue stream and on the fly encode to wanted format # send the compressed/endoded stream to the client. async with AsyncProcess(sox_args, True) as sox_proc: @@ -294,6 +325,7 @@ class StreamController: sample_rate=start_streamdetails.sample_rate, bit_depth=start_streamdetails.bit_depth, channels=start_streamdetails.channels, + resample=resample, ): if sox_proc.closed: return @@ -362,6 +394,7 @@ class StreamController: last_fadeout_data = b"" queue_index = None track_count = 0 + prev_track: Optional[QueueItem] = None pcm_fmt = ContentType.from_bit_depth(bit_depth) self.logger.info( @@ -400,7 +433,6 @@ class StreamController: if not resample and streamdetails.bit_depth > bit_depth: await queue.queue_stream_signal_next() self.logger.info("Abort queue stream due to bit depth mismatch") - await queue.queue_stream_signal_next() break if ( not resample @@ -411,10 +443,25 @@ class StreamController: await queue.queue_stream_signal_next() break + # check crossfade ability + use_crossfade = queue.settings.crossfade_mode != CrossFadeMode.DISABLED + if ( + prev_track is not None + and prev_track.media_type == MediaType.TRACK + and queue_track.media_type == MediaType.TRACK + ): + prev_item = await self.mass.music.get_item_by_uri(prev_track.uri) + new_item = await self.mass.music.get_item_by_uri(queue_track.uri) + if ( + prev_item.album is not None + and new_item.album is not None + and prev_item.album == new_item.album + ): + use_crossfade = False + prev_track = queue_track + sample_size = int(sample_rate * (bit_depth / 8) * channels) # 1 second - buffer_size = sample_size * ( - queue.crossfade_duration or 1 - ) # 1...10 seconds + buffer_size = sample_size * (queue.settings.crossfade_duration or 2) self.logger.debug( "Start Streaming queue track: %s (%s) for queue %s", @@ -496,10 +543,10 @@ class StreamController: # part is too short after the strip action # so we just use the entire original data last_part = prev_chunk + chunk - if not queue.crossfade_duration or len(last_part) < buffer_size: + if not use_crossfade or len(last_part) < buffer_size: # crossfading is not enabled or not enough data, # so just pass the (stripped) audio data - if queue.crossfade_duration: + if use_crossfade: self.logger.warning( "Not enough data for crossfade: %s", len(last_part) ) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 0ffd4929..b8807a5f 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -241,9 +241,9 @@ async def get_gain_correct( ) -> Tuple[float, float]: """Get gain correction for given queue / track combination.""" queue = mass.players.get_player_queue(queue_id) - if not queue or not queue.volume_normalization_enabled: + if not queue or not queue.settings.volume_normalization_enabled: return 0 - target_gain = queue.volume_normalization_target + target_gain = queue.settings.volume_normalization_target track_loudness = await mass.music.get_track_loudness(item_id, provider_id) if track_loudness is None: # fallback to provider average @@ -525,7 +525,7 @@ async def get_sox_args_for_pcm_stream( floating_point: bool = False, output_format: ContentType = ContentType.FLAC, ) -> List[str]: - """Collect args for aox (or ffmpeg) when converting from raw pcm to another contenttype.""" + """Collect args for sox (or ffmpeg) when converting from raw pcm to another contenttype.""" sox_present, ffmpeg_present = await check_audio_support() input_format = ContentType.from_bit_depth(bit_depth, floating_point) diff --git a/music_assistant/helpers/database.py b/music_assistant/helpers/database.py index 0f652745..6bf0e770 100755 --- a/music_assistant/helpers/database.py +++ b/music_assistant/helpers/database.py @@ -11,7 +11,7 @@ from music_assistant.helpers.typing import MusicAssistant # pylint: disable=invalid-name -SCHEMA_VERSION = 4 +SCHEMA_VERSION = 5 TABLE_PROV_MAPPINGS = "provider_mappings" TABLE_TRACK_LOUDNESS = "track_loudness" @@ -22,6 +22,7 @@ TABLE_TRACKS = "tracks" TABLE_PLAYLISTS = "playlists" TABLE_RADIOS = "radios" TABLE_CACHE = "cache" +TABLE_SETTINGS = "settings" class Database: @@ -53,6 +54,18 @@ class Database: async with Db(self.url, timeout=360) as _db: yield _db + async def get_setting(self, key: str, db: Optional[Db] = None) -> str | None: + """Get setting from settings table.""" + return await self.get_row(TABLE_SETTINGS, {"key": key}, db=db) + + async def set_setting(self, key: str, value: str, db: Optional[Db] = None) -> None: + """Set setting in settings table.""" + if not isinstance(value, str): + value = str(value) + return await self.insert_or_replace( + TABLE_SETTINGS, {"key": key, "value": value} + ) + async def get_rows( self, table: str, @@ -150,138 +163,140 @@ class Database: async def _migrate(self): """Perform database migration actions if needed.""" - prev_version = await self.get_row("settings", {"key": "version"}) - if prev_version: - prev_version = int(prev_version["value"]) - else: - prev_version = 0 - if SCHEMA_VERSION != prev_version: - self.logger.info( - "Performing database migration from %s to %s", - prev_version, - SCHEMA_VERSION, - ) + async with self.get_db() as db: + if await self.exists(TABLE_SETTINGS, db): + prev_version = await self.get_setting("version", db) + prev_version = int(prev_version["value"]) + else: + prev_version = 0 - if prev_version < 3: - # schema version 3: too many breaking changes, rebuild db - async with self.get_db() as _db: - await _db.execute(f"DROP TABLE IF EXISTS {TABLE_ARTISTS}") - await _db.execute(f"DROP TABLE IF EXISTS {TABLE_ALBUMS}") - await _db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}") - await _db.execute(f"DROP TABLE IF EXISTS {TABLE_PLAYLISTS}") - await _db.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}") - await _db.execute(f"DROP TABLE IF EXISTS {TABLE_PROV_MAPPINGS}") - await _db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}") + if SCHEMA_VERSION != prev_version: + self.logger.info( + "Performing database migration from %s to %s", + prev_version, + SCHEMA_VERSION, + ) - if prev_version < 4: - # schema version 4: add album to tracks table - async with self.get_db() as _db: - await _db.execute("DROP TABLE IF EXISTS tracks") - if await self.exists(TABLE_PROV_MAPPINGS, _db): + if prev_version < 3: + # schema version 3: too many breaking changes, rebuild db + await db.execute(f"DROP TABLE IF EXISTS {TABLE_ARTISTS}") + await db.execute(f"DROP TABLE IF EXISTS {TABLE_ALBUMS}") + await db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}") + await db.execute(f"DROP TABLE IF EXISTS {TABLE_PLAYLISTS}") + await db.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}") + await db.execute(f"DROP TABLE IF EXISTS {TABLE_PROV_MAPPINGS}") + await db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}") + + if prev_version < 4: + # schema version 4: add album to tracks table + await db.execute("DROP TABLE IF EXISTS tracks") + if await self.exists(TABLE_PROV_MAPPINGS, db): await self.delete( - TABLE_PROV_MAPPINGS, {"media_type": "track"}, db=_db + TABLE_PROV_MAPPINGS, {"media_type": "track"}, db=db ) - # create db tables - await self.__create_database_tables() - # store current schema version - await self.insert_or_replace( - "settings", {"key": "version", "value": str(SCHEMA_VERSION)} - ) + if prev_version < 5: + # delete player_settings table: use generic settings table instead + await db.execute("DROP TABLE IF EXISTS queue_settings") + + # create db tables + await self.__create_database_tables(db) + # store current schema version + await self.set_setting("version", str(SCHEMA_VERSION), db=db) - async def __create_database_tables(self) -> None: + @staticmethod + async def __create_database_tables(db: Db) -> None: """Init generic database tables.""" - async with self.mass.database.get_db() as _db: - await _db.execute( - f"""CREATE TABLE IF NOT EXISTS {TABLE_PROV_MAPPINGS}( - item_id INTEGER NOT NULL, - media_type TEXT NOT NULL, - prov_item_id TEXT NOT NULL, - provider TEXT NOT NULL, - quality INTEGER NULL, - details TEXT NULL, - url TEXT NULL, - UNIQUE(item_id, media_type, prov_item_id, provider) - );""" - ) - await _db.execute( - f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACK_LOUDNESS}( - item_id INTEGER NOT NULL, - provider TEXT NOT NULL, - loudness REAL, - UNIQUE(item_id, provider));""" - ) - await _db.execute( - f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLOG}( + await db.execute( + f"""CREATE TABLE IF NOT EXISTS {TABLE_PROV_MAPPINGS}( item_id INTEGER NOT NULL, + media_type TEXT NOT NULL, + prov_item_id TEXT NOT NULL, provider TEXT NOT NULL, - timestamp REAL, - UNIQUE(item_id, provider));""" - ) - await _db.execute( - f"""CREATE TABLE IF NOT EXISTS {TABLE_ALBUMS}( - item_id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - sort_name TEXT NOT NULL, - album_type TEXT, - year INTEGER, - version TEXT, - in_library BOOLEAN DEFAULT 0, - upc TEXT, - musicbrainz_id TEXT, - artist json, - metadata json, - provider_ids json - );""" - ) - await _db.execute( - f"""CREATE TABLE IF NOT EXISTS {TABLE_ARTISTS}( - item_id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - sort_name TEXT NOT NULL, - musicbrainz_id TEXT NOT NULL UNIQUE, - in_library BOOLEAN DEFAULT 0, - metadata json, - provider_ids json - );""" - ) - await _db.execute( - f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACKS}( - item_id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - sort_name TEXT NOT NULL, - version TEXT, - duration INTEGER, - in_library BOOLEAN DEFAULT 0, - isrc TEXT, - musicbrainz_id TEXT, - artists json, - album json, - metadata json, - provider_ids json - );""" - ) - await _db.execute( - f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLISTS}( - item_id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - sort_name TEXT NOT NULL, - owner TEXT NOT NULL, - is_editable BOOLEAN NOT NULL, - checksum TEXT NOT NULL, - in_library BOOLEAN DEFAULT 0, - metadata json, - provider_ids json, - UNIQUE(name, owner) + quality INTEGER NULL, + details TEXT NULL, + url TEXT NULL, + UNIQUE(item_id, media_type, prov_item_id, provider) );""" - ) - await _db.execute( - f"""CREATE TABLE IF NOT EXISTS {TABLE_RADIOS}( - item_id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL UNIQUE, - sort_name TEXT NOT NULL, - in_library BOOLEAN DEFAULT 0, - metadata json, - provider_ids json + ) + await db.execute( + f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACK_LOUDNESS}( + item_id INTEGER NOT NULL, + provider TEXT NOT NULL, + loudness REAL, + UNIQUE(item_id, provider));""" + ) + await db.execute( + f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLOG}( + item_id INTEGER NOT NULL, + provider TEXT NOT NULL, + timestamp REAL, + UNIQUE(item_id, provider));""" + ) + await db.execute( + f"""CREATE TABLE IF NOT EXISTS {TABLE_ALBUMS}( + item_id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + sort_name TEXT NOT NULL, + album_type TEXT, + year INTEGER, + version TEXT, + in_library BOOLEAN DEFAULT 0, + upc TEXT, + musicbrainz_id TEXT, + artist json, + metadata json, + provider_ids json + );""" + ) + await db.execute( + f"""CREATE TABLE IF NOT EXISTS {TABLE_ARTISTS}( + item_id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + sort_name TEXT NOT NULL, + musicbrainz_id TEXT NOT NULL UNIQUE, + in_library BOOLEAN DEFAULT 0, + metadata json, + provider_ids json );""" - ) + ) + await db.execute( + f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACKS}( + item_id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + sort_name TEXT NOT NULL, + version TEXT, + duration INTEGER, + in_library BOOLEAN DEFAULT 0, + isrc TEXT, + musicbrainz_id TEXT, + artists json, + album json, + metadata json, + provider_ids json + );""" + ) + await db.execute( + f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLISTS}( + item_id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + sort_name TEXT NOT NULL, + owner TEXT NOT NULL, + is_editable BOOLEAN NOT NULL, + checksum TEXT NOT NULL, + in_library BOOLEAN DEFAULT 0, + metadata json, + provider_ids json, + UNIQUE(name, owner) + );""" + ) + await db.execute( + f"""CREATE TABLE IF NOT EXISTS {TABLE_RADIOS}( + item_id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE, + sort_name TEXT NOT NULL, + in_library BOOLEAN DEFAULT 0, + metadata json, + provider_ids json + );""" + ) diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index dfc89504..5ff4469a 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -5,18 +5,37 @@ import asyncio from abc import ABC from dataclasses import dataclass from enum import Enum -from typing import TYPE_CHECKING, Any, Dict, List +from typing import TYPE_CHECKING, Any, Dict, List, Tuple from mashumaro import DataClassDictMixin from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.typing import MusicAssistant from music_assistant.helpers.util import get_changed_keys +from music_assistant.models.media_items import ContentType if TYPE_CHECKING: from .player_queue import PlayerQueue +DEFAULT_SUPPORTED_CONTENT_TYPES = ( + # if a player does not report/set its supported content types, we use a pretty safe default + ContentType.FLAC, + ContentType.MP3, + ContentType.WAV, + ContentType.PCM_S16LE, + ContentType.PCM_S24LE, +) + +DEFAULT_SUPPORTED_SAMPLE_RATES = ( + # if a player does not report/set its supported sample rates, we use a pretty safe default + 44100, + 48000, + 88200, + 96000, +) + + class PlayerState(Enum): """Enum for the (playback)state of a player.""" @@ -49,7 +68,8 @@ class Player(ABC): _attr_available: bool = True _attr_volume_level: int = 100 _attr_device_info: DeviceInfo = DeviceInfo() - _attr_max_sample_rate: int = 96000 + _attr_supported_content_types: Tuple[ContentType] = DEFAULT_SUPPORTED_CONTENT_TYPES + _attr_supported_sample_rates: Tuple[int] = DEFAULT_SUPPORTED_SAMPLE_RATES _attr_active_queue_id: str = "" _attr_use_multi_stream: bool = False # below objects will be set by playermanager at register/update @@ -116,9 +136,14 @@ class Player(ABC): return self._attr_device_info @property - def max_sample_rate(self) -> int: - """Return the maximum supported sample rate this player supports.""" - return self._attr_max_sample_rate + def supported_sample_rates(self) -> Tuple[int]: + """Return the sample rates this player supports.""" + return self._attr_supported_sample_rates + + @property + def supported_content_types(self) -> Tuple[ContentType]: + """Return the content types this player supports.""" + return self._attr_supported_content_types @property def active_queue(self) -> PlayerQueue: diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index f9cddc49..002ec77b 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -83,6 +83,192 @@ class QueueItem(DataClassDictMixin): ) +class CrossFadeMode(Enum): + """Enum with crossfade modes.""" + + DISABLED = "disabled" # no crossfading at all + STRICT = "strict" # do not crossfade tracks of same album + SMART = "smart" # crossfade if possible (do not crossfade different sample rates) + ALWAYS = "always" # all tracks - resample to fixed sample rate + + +class RepeatMode(Enum): + """Enum with repeat modes.""" + + DISABLED = "disabled" # no repeat at all + SINGLE = "single" # repeat current/single track + ALL = "all" # repeat entire queue + + +class QueueSettings: + """Representation of (user adjustable) PlayerQueue settings/preferences.""" + + def __init__(self, queue: PlayerQueue) -> None: + """Initialize.""" + self._queue = queue + self.mass = queue.mass + self._repeat_mode: RepeatMode = RepeatMode.DISABLED + self._shuffle_enabled: bool = False + self._crossfade_mode: CrossFadeMode = CrossFadeMode.DISABLED + self._crossfade_duration: int = 6 + self._volume_normalization_enabled: bool = True + self._volume_normalization_target: int = -23 + + @property + def repeat_mode(self) -> RepeatMode: + """Return repeat enabled setting.""" + return self._repeat_mode + + @repeat_mode.setter + def repeat_mode(self, enabled: bool) -> None: + """Set repeat enabled setting.""" + if self._repeat_mode != enabled: + self._repeat_mode = enabled + self._on_update("repeat_mode") + + @property + def shuffle_enabled(self) -> bool: + """Return shuffle enabled setting.""" + return self._shuffle_enabled + + @shuffle_enabled.setter + def shuffle_enabled(self, enabled: bool) -> None: + """Set shuffle enabled setting.""" + if not self._shuffle_enabled and enabled: + # shuffle requested + self._shuffle_enabled = True + if self._queue.current_index is not None: + played_items = self._queue.items[: self._queue.current_index] + next_items = self._queue.items[self._queue.current_index + 1 :] + # for now we use default python random function + # can be extended with some more magic based on last_played and stuff + next_items = random.sample(next_items, len(next_items)) + items = played_items + [self._queue.current_item] + next_items + asyncio.create_task(self._queue.update(items)) + self._on_update("shuffle_enabled") + elif self._shuffle_enabled and not enabled: + # unshuffle + self._shuffle_enabled = False + if self._queue.current_index is not None: + played_items = self._queue.items[: self._queue.current_index] + next_items = self._queue.items[self._queue.current_index + 1 :] + next_items.sort(key=lambda x: x.sort_index, reverse=False) + items = played_items + [self._queue.current_item] + next_items + asyncio.create_task(self._queue.update(items)) + self._on_update("shuffle_enabled") + + @property + def crossfade_mode(self) -> CrossFadeMode: + """Return crossfade mode setting.""" + return self._crossfade_mode + + @crossfade_mode.setter + def crossfade_mode(self, mode: CrossFadeMode) -> None: + """Set crossfade enabled setting.""" + if self._crossfade_mode != mode: + # TODO: restart the queue stream if its playing + self._crossfade_mode = mode + self._on_update("crossfade_mode") + + @property + def crossfade_duration(self) -> int: + """Return crossfade_duration setting.""" + return self._crossfade_duration + + @crossfade_duration.setter + def crossfade_duration(self, duration: int) -> None: + """Set crossfade_duration setting (1..10 seconds).""" + duration = max(1, duration) + duration = min(10, duration) + if self._crossfade_duration != duration: + self._crossfade_duration = duration + self._on_update("crossfade_duration") + + @property + def volume_normalization_enabled(self) -> bool: + """Return volume_normalization_enabled setting.""" + return self._volume_normalization_enabled + + @volume_normalization_enabled.setter + def volume_normalization_enabled(self, enabled: bool) -> None: + """Set volume_normalization_enabled setting.""" + if self._volume_normalization_enabled != enabled: + self._volume_normalization_enabled = enabled + self._on_update("volume_normalization_enabled") + self.save() + + @property + def volume_normalization_target(self) -> float: + """Return volume_normalization_target setting.""" + return self._volume_normalization_target + + @volume_normalization_target.setter + def volume_normalization_target(self, target: float) -> None: + """Set volume_normalization_target setting (-40..10 LUFS).""" + target = max(-40, target) + target = min(10, target) + if self._volume_normalization_target != target: + self._volume_normalization_target = target + self._on_update("volume_normalization_target") + + @property + def stream_type(self) -> ContentType: + """Return supported/preferred stream type for playerqueue. Read only.""" + # determine default stream type from player capabilities + return next( + x + for x in ( + ContentType.FLAC, + ContentType.WAV, + ContentType.PCM_S16LE, + ContentType.MP3, + ContentType.MPEG, + ) + if x in self._queue.player.supported_content_types + ) + + def to_dict(self) -> Dict[str, Any]: + """Return dict from settings.""" + return { + "repeat_mode": self.repeat_mode.value, + "shuffle_enabled": self.shuffle_enabled, + "crossfade_mode": self.crossfade_mode, + "crossfade_duration": self.crossfade_duration, + "volume_normalization_enabled": self.volume_normalization_enabled, + "volume_normalization_target": self.volume_normalization_target, + } + + async def restore(self) -> None: + """Restore state from db.""" + async with self.mass.database.get_db() as _db: + for key, val_type in ( + ("repeat_mode", RepeatMode), + ("crossfade_mode", CrossFadeMode), + ("shuffle_enabled", bool), + ("crossfade_duration", int), + ("volume_normalization_enabled", bool), + ("volume_normalization_target", float), + ): + db_key = f"{self._queue.queue_id}_{key}" + if db_value := await self.mass.database.get_setting(db_key, db=_db): + value = val_type(db_value["value"]) + setattr(self, f"_{key}", value) + + def _on_update(self, changed_key: Optional[str] = None) -> None: + """Handle state changed.""" + self._queue.signal_update() + self.mass.create_task(self.save(changed_key)) + # TODO: restart play if setting changed that impacts playing queue + + async def save(self, changed_key: Optional[str] = None) -> None: + """Save state in db.""" + async with self.mass.database.get_db() as _db: + for key, value in self.to_dict().items(): + if key == changed_key or changed_key is None: + db_key = f"{self._queue.queue_id}_{key}" + await self.mass.database.set_setting(db_key, value, db=_db) + + class PlayerQueue: """Represents a PlayerQueue object.""" @@ -91,13 +277,7 @@ class PlayerQueue: self.mass = mass self.logger = mass.players.logger self.queue_id = player_id - - self._shuffle_enabled: bool = False - self._repeat_enabled: bool = False - self._crossfade_duration: int = 0 - self._volume_normalization_enabled: bool = True - self._volume_normalization_target: int = -23 - + self._settings = QueueSettings(self) self._current_index: Optional[int] = None self._current_item_elapsed_time: int = 0 self._last_item: Optional[QueueItem] = None @@ -109,15 +289,24 @@ class PlayerQueue: self._update_task: Task = None self._signal_next: bool = False self._last_player_update: int = 0 - self._stream_url: str = self.mass.streams.get_stream_url(self.queue_id) + self._stream_url: str = "" async def setup(self) -> None: """Handle async setup of instance.""" - await self._restore_saved_state() + await self._settings.restore() + await self._restore_items() + self._stream_url: str = self.mass.streams.get_stream_url( + self.queue_id, content_type=self._settings.stream_type + ) self.mass.signal_event( MassEvent(EventType.QUEUE_ADDED, object_id=self.queue_id, data=self) ) + @property + def settings(self) -> QueueSettings: + """Return settings/preferences for this PlayerQueue.""" + return self._settings + @property def player(self) -> Player | PlayerGroup: """Return the player attached to this queue.""" @@ -142,27 +331,10 @@ class PlayerQueue: return self.player.elapsed_time return self._current_item_elapsed_time - @property - def repeat_enabled(self) -> bool: - """Return if repeat is enabled.""" - return self._repeat_enabled - - @property - def shuffle_enabled(self) -> bool: - """Return if shuffle is enabled.""" - return self._shuffle_enabled - - @property - def crossfade_duration(self) -> int: - """Return crossfade duration (0 if disabled).""" - return self._crossfade_duration - @property def max_sample_rate(self) -> int: - """Return the maximum supported sample rate this playerqueue supports.""" - if self.player.max_sample_rate is None: - return 96000 - return self.player.max_sample_rate + """Return the maximum samplerate supported by this queue(player).""" + return max(self.player.supported_sample_rates) @property def items(self) -> List[QueueItem]: @@ -198,16 +370,6 @@ class PlayerQueue: return self._items[next_index] return None - @property - def volume_normalization_enabled(self) -> bool: - """Return bool if volume normalization is enabled for this queue.""" - return self._volume_normalization_enabled - - @property - def volume_normalization_target(self) -> int: - """Return volume target (in LUFS) for volume normalization for this queue.""" - return self._volume_normalization_target - def get_item(self, index: int) -> QueueItem | None: """Get queue item by index.""" if index is not None and len(self._items) > index: @@ -299,75 +461,6 @@ class PlayerQueue: if queue_opt == QueueOption.ADD: return await self.append(queue_items) - async def set_shuffle_enabled(self, enable_shuffle: bool) -> None: - """Set shuffle.""" - if not self._shuffle_enabled and enable_shuffle: - # shuffle requested - self._shuffle_enabled = True - if self._current_index is not None: - played_items = self.items[: self._current_index] - next_items = self.__shuffle_items(self.items[self._current_index + 1 :]) - items = played_items + [self.current_item] + next_items - self.mass.signal_event( - MassEvent( - EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self - ) - ) - await self.update(items) - elif self._shuffle_enabled and not enable_shuffle: - # unshuffle - self._shuffle_enabled = False - if self._current_index is not None: - played_items = self.items[: self._current_index] - next_items = self.items[self._current_index + 1 :] - next_items.sort(key=lambda x: x.sort_index, reverse=False) - items = played_items + [self.current_item] + next_items - self.mass.signal_event( - MassEvent( - EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self - ) - ) - await self.update(items) - - async def set_repeat_enabled(self, enable_repeat: bool) -> None: - """Set the repeat mode for this queue.""" - if self._repeat_enabled != enable_repeat: - self._repeat_enabled = enable_repeat - self.mass.signal_event( - MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) - ) - await self._save_state(False) - - async def set_crossfade_duration(self, duration: int) -> None: - """Set the crossfade duration for this queue, 0 to disable.""" - duration = max(duration, 10) - if self._crossfade_duration != duration: - self._crossfade_duration = duration - self.mass.signal_event( - MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) - ) - await self._save_state(False) - - async def set_volume_normalization_enabled(self, enable: bool) -> None: - """Set volume normalization.""" - if self._repeat_enabled != enable: - self._repeat_enabled = enable - self.mass.signal_event( - MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) - ) - await self._save_state(False) - - async def set_volume_normalization_target(self, target: int) -> None: - """Set the target for the volume normalization in LUFS (default is -23).""" - target = min(target, 0) - target = max(target, -40) - if self._volume_normalization_target != target: - self._volume_normalization_target = target - self.mass.signal_event( - MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) - ) - await self._save_state(False) - async def stop(self) -> None: """Stop command on queue player.""" # redirect to underlying player @@ -429,23 +522,31 @@ class PlayerQueue: self._current_index = index self._next_start_index = index # send stream url to player connected to this queue - self._stream_url = self.mass.streams.get_stream_url(self.queue_id) + self._stream_url = self.mass.streams.get_stream_url( + self.queue_id, content_type=self._settings.stream_type + ) if self.player.use_multi_stream: # multi stream enabled, all child players should receive the same audio stream # redirect command to all (powered) players + # TODO: this assumes that all client players support flac + content_type = ContentType.FLAC coros = [] expected_clients = set() for child_id in self.player.group_childs: if child_player := self.mass.players.get_player(child_id): if child_player.powered: + # TODO: this assumes that all client players support flac player_url = self.mass.streams.get_stream_url( - self.queue_id, child_id + self.queue_id, child_id, content_type ) expected_clients.add(child_id) coros.append(child_player.play_url(player_url)) await self.mass.streams.start_multi_client_queue_stream( - self.queue_id, expected_clients, ContentType.FLAC + # TODO: this assumes that all client players support flac + self.queue_id, + expected_clients, + content_type, ) await asyncio.gather(*coros) else: @@ -478,14 +579,11 @@ class PlayerQueue: """Load (overwrite) queue with new items.""" for index, item in enumerate(queue_items): item.sort_index = index - if self._shuffle_enabled and len(queue_items) > 5: - queue_items = self.__shuffle_items(queue_items) + if self.settings.shuffle_enabled and len(queue_items) > 5: + queue_items = random.sample(queue_items, len(queue_items)) self._items = queue_items - self.mass.signal_event( - MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self) - ) await self.play_index(0) - await self._save_state() + self.signal_update(True) async def insert(self, queue_items: List[QueueItem], offset: int = 0) -> None: """ @@ -501,8 +599,8 @@ class PlayerQueue: insert_at_index = self._current_index + offset for index, item in enumerate(queue_items): item.sort_index = insert_at_index + index - if self.shuffle_enabled and len(queue_items) > 5: - queue_items = self.__shuffle_items(queue_items) + if self.settings.shuffle_enabled and len(queue_items) > 5: + queue_items = random.sample(queue_items, len(queue_items)) if offset == 0: # replace current item with new self._items = ( @@ -520,35 +618,26 @@ class PlayerQueue: if offset == 0: await self.play_index(insert_at_index) - self.mass.signal_event( - MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self) - ) - await self._save_state() + self.signal_update(True) async def append(self, queue_items: List[QueueItem]) -> None: """Append new items at the end of the queue.""" for index, item in enumerate(queue_items): item.sort_index = len(self.items) + index - if self.shuffle_enabled: + if self.settings.shuffle_enabled: played_items = self.items[: self._current_index] next_items = self.items[self._current_index + 1 :] + queue_items - next_items = self.__shuffle_items(next_items) + next_items = random.sample(next_items, len(next_items)) items = played_items + [self.current_item] + next_items await self.update(items) return self._items = self._items + queue_items - self.mass.signal_event( - MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self) - ) - await self._save_state() + self.signal_update(True) async def update(self, queue_items: List[QueueItem]) -> None: """Update the existing queue items, mostly caused by reordering.""" self._items = queue_items - self.mass.signal_event( - MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self) - ) - await self._save_state() + self.signal_update(True) async def clear(self) -> None: """Clear all items in the queue.""" @@ -611,10 +700,7 @@ class PlayerQueue: self._current_item_elapsed_time = int(track_time) if new_item_loaded: - self.mass.create_task(self._save_state()) - self.mass.signal_event( - MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) - ) + self.signal_update() if abs(prev_item_time - self._current_item_elapsed_time) >= 1: self.mass.signal_event( MassEvent( @@ -659,9 +745,11 @@ class PlayerQueue: if index is None: # guard just in case return 0 + if self.settings.repeat_mode == RepeatMode.SINGLE: + return index if len(self._items) > (index + 1): return index + 1 - if self.repeat_enabled: + if self.settings.repeat_mode == RepeatMode.ALL: # repeat enabled, start queue at beginning return 0 return None @@ -670,6 +758,20 @@ class PlayerQueue: """Indicate that queue stream needs to start next index once playback finished.""" self._signal_next = True + def signal_update(self, items_changed: bool = False) -> None: + """Signal state changed of this queue.""" + if items_changed: + self.mass.create_task(self._save_items()) + self.mass.signal_event( + MassEvent( + EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self + ) + ) + else: + self.mass.signal_event( + MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) + ) + def to_dict(self) -> Dict[str, Any]: """Export object to dict.""" cur_item = self.current_item.to_dict() if self.current_item else None @@ -685,11 +787,7 @@ class PlayerQueue: "current_index": self.current_index, "current_item": cur_item, "next_item": next_item, - "shuffle_enabled": self.shuffle_enabled, - "repeat_enabled": self.repeat_enabled, - "volume_normalization_enabled": self.volume_normalization_enabled, - "volume_normalization_target": self.volume_normalization_target, - "crossfade_duration": self.crossfade_duration, + "settings": self.settings.to_dict(), } def __get_queue_stream_index(self) -> Tuple[int, int]: @@ -716,46 +814,26 @@ class PlayerQueue: break return queue_index, track_time - @staticmethod - def __shuffle_items(queue_items: List[QueueItem]) -> List[QueueItem]: - """Shuffle a list of tracks.""" - # for now we use default python random function - # can be extended with some more magic based on last_played and stuff - return random.sample(queue_items, len(queue_items)) - - async def _restore_saved_state(self) -> None: - """Try to load the saved state from database.""" - if db_row := await self.mass.database.get_row( - "queue_settings", {"queue_id": self.queue_id} - ): - self._shuffle_enabled = bool(db_row["shuffle_enabled"]) - self._repeat_enabled = bool(db_row["repeat_enabled"]) - self._crossfade_duration = db_row["crossfade_duration"] + async def _restore_items(self) -> None: + """Try to load the saved state from cache.""" if queue_cache := await self.mass.cache.get(f"queue_items.{self.queue_id}"): - self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]] - self._current_index = queue_cache["current_index"] - - async def _save_state(self, save_items: bool = True) -> None: - """Save state in database.""" - # save queue settings in db - await self.mass.database.insert_or_replace( - "queue_settings", + try: + self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]] + self._current_index = queue_cache["current_index"] + except (KeyError, AttributeError, TypeError) as err: + self.logger.warning( + "Unable to restore queue state for queue %s", + self.queue_id, + exc_info=err, + ) + await self.settings.restore() + + async def _save_items(self) -> None: + """Save current queue items/state in cache.""" + await self.mass.cache.set( + f"queue_items.{self.queue_id}", { - "queue_id": self.queue_id, - "shuffle_enabled": self._shuffle_enabled, - "repeat_enabled": self.repeat_enabled, - "crossfade_duration": self._crossfade_duration, - "volume_normalization_enabled": self._volume_normalization_enabled, - "volume_normalization_target": self._volume_normalization_target, + "items": [x.to_dict() for x in self._items], + "current_index": self._current_index, }, ) - - # store current items in cache - if save_items: - await self.mass.cache.set( - f"queue_items.{self.queue_id}", - { - "items": [x.to_dict() for x in self._items], - "current_index": self._current_index, - }, - )