From: Marcel van der Veldt Date: Tue, 12 Jul 2022 21:08:27 +0000 (+0200) Subject: Make Announce feature more robust (#409) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=9c10abd4c72ae332cbd8e8fa6f0fd928620e3d80;p=music-assistant-server.git Make Announce feature more robust (#409) * Rename alert to announce (use same naming as HA) * Make the Announce/alert feature more robust - recover from unexpected conditions - fix fade-in - use silence stream to detect end of playback - handle edge cases (discovered so far) * prevent queue command while announcement busy * create setting for announce volume * fix wrong loudness for url provider --- diff --git a/music_assistant/controllers/music/__init__.py b/music_assistant/controllers/music/__init__.py index 06a3c3a6..4c25de40 100755 --- a/music_assistant/controllers/music/__init__.py +++ b/music_assistant/controllers/music/__init__.py @@ -329,6 +329,9 @@ class MusicController: async def get_provider_loudness(self, provider: ProviderType) -> float | None: """Get average integrated loudness for tracks of given provider.""" all_items = [] + if provider == ProviderType.URL: + # this is not a very good idea for random urls + return None for db_row in await self.mass.database.get_rows( TABLE_TRACK_LOUDNESS, { diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 15fa89b8..16a854c8 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -14,7 +14,6 @@ from aiohttp import web from music_assistant.helpers.audio import ( check_audio_support, crossfade_pcm_parts, - fadein_pcm_part, get_chunksize, get_media_stream, get_preview_stream, @@ -76,6 +75,14 @@ class StreamsController: """Return url to control endpoint.""" return f"{self.base_url}/{queue_id}/{control}" + def get_silence_url( + self, + content_type: ContentType = ContentType.WAV, + ) -> str: + """Generate stream url for a silence Stream.""" + ext = content_type.value + return f"{self.base_url}/silence.{ext}" + async def setup(self) -> None: """Async initialize of module.""" app = web.Application() @@ -147,7 +154,7 @@ class StreamsController: @staticmethod async def serve_silence(request: web.Request): """Serve some nice silence.""" - duration = int(request.query.get("duration", 60)) + duration = int(request.query.get("duration", 3600)) fmt = ContentType.try_parse(request.match_info["fmt"]) resp = web.StreamResponse( @@ -238,7 +245,6 @@ class StreamsController: seek_position: int, fade_in: bool, output_format: ContentType, - is_alert: bool, ) -> QueueStream: """Start running a queue stream.""" # generate unique stream url @@ -252,8 +258,8 @@ class StreamsController: streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id) # work out pcm details - if is_alert: - pcm_sample_rate = 41000 + if streamdetails.media_type == MediaType.ANNOUNCEMENT: + pcm_sample_rate = 44100 pcm_bit_depth = 16 pcm_channels = 2 allow_resample = True @@ -284,7 +290,6 @@ class StreamsController: pcm_bit_depth=pcm_bit_depth, pcm_channels=pcm_channels, allow_resample=allow_resample, - is_alert=is_alert, autostart=True, ) # cleanup stale previous queue tasks @@ -317,7 +322,6 @@ class QueueStream: pcm_channels: int = 2, pcm_floating_point: bool = False, allow_resample: bool = False, - is_alert: bool = False, autostart: bool = False, ): """Init QueueStreamJob instance.""" @@ -332,7 +336,6 @@ class QueueStream: self.pcm_channels = pcm_channels self.pcm_floating_point = pcm_floating_point self.allow_resample = allow_resample - self.is_alert = is_alert self.url = queue.mass.streams.get_stream_url(stream_id, output_format) self.mass = queue.mass @@ -422,6 +425,12 @@ class QueueStream: # add metadata "-metadata", "title=Streaming from Music Assistant", + ] + # fade-in if needed + if self.fade_in: + ffmpeg_args += ["-af", "afade=t=in:st=0:d=5"] + + ffmpeg_args += [ # output args "-f", self.output_format.value, @@ -513,7 +522,6 @@ class QueueStream: if track_count == 1: queue_index = self.start_index seek_position = self.seek_position - fade_in = self.fade_in else: next_index = self.queue.get_next_index(queue_index) # break here if repeat is enabled @@ -522,7 +530,6 @@ class QueueStream: break queue_index = next_index seek_position = 0 - fade_in = False self.index_in_buffer = queue_index # send signal that we've loaded a new track into the buffer self.queue.signal_update() @@ -567,9 +574,16 @@ class QueueStream: break # check crossfade ability - use_crossfade = self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED + use_crossfade = ( + self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED + and self.queue.settings.crossfade_duration > 0 + and streamdetails.media_type != MediaType.ANNOUNCEMENT + ) + # do not crossfade tracks of same album if ( - prev_track is not None + use_crossfade + and self.queue.settings.crossfade_mode != CrossFadeMode.ALWAYS + and prev_track is not None and prev_track.media_type == MediaType.TRACK and queue_track.media_type == MediaType.TRACK ): @@ -580,33 +594,35 @@ class QueueStream: and new_item.album is not None and prev_item.album == new_item.album ): + self.logger.debug("Skipping crossfade: Tracks are from same album") use_crossfade = False prev_track = queue_track - # calculate sample_size based on PCM params for 200ms of audio + # calculate sample_size based on PCM params for 1 second of audio input_format = ContentType.from_bit_depth( self.pcm_bit_depth, self.pcm_floating_point ) - sample_size = get_chunksize( + sample_size_per_second = get_chunksize( input_format, self.pcm_sample_rate, self.pcm_bit_depth, self.pcm_channels, ) - # buffer size is duration of crossfade + 3 seconds - crossfade_duration = self.queue.settings.crossfade_duration or fade_in or 1 - crossfade_size = (sample_size * 5) * crossfade_duration - buf_size = (sample_size * 5) * (crossfade_duration * 3) - total_size = (sample_size * 5) * (queue_track.duration or 0) + crossfade_duration = self.queue.settings.crossfade_duration + crossfade_size = sample_size_per_second * crossfade_duration + # buffer_duration has some overhead to account for padded silence + buffer_duration = (crossfade_duration or 2) * 2 + # predict total size to expect for this track from duration + stream_duration = (queue_track.duration or 0) - seek_position self.logger.info( - "Start Streaming queue track: %s (%s) for queue %s", + "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s", queue_track.uri, queue_track.name, self.queue.player.name, + use_crossfade, ) queue_track.streamdetails.seconds_skipped = seek_position - chunk_count = 0 buffer = b"" bytes_written = 0 # handle incoming audio chunks @@ -617,8 +633,11 @@ class QueueStream: sample_rate=self.pcm_sample_rate, channels=self.pcm_channels, seek_position=seek_position, + chunk_size=sample_size_per_second, ): - chunk_count += 1 + + seconds_streamed = bytes_written / sample_size_per_second + seconds_in_buffer = len(buffer) / sample_size_per_second #### HANDLE FIRST PART OF TRACK @@ -628,96 +647,92 @@ class QueueStream: queue_track.streamdetails.seconds_streamed = 0 break - # track has no duration or duration < 30s: pypass any further processing - if queue_track.duration is None or queue_track.duration < 30: - bytes_written += len(chunk) + # bypass any processing for radiostreams and announcements + if ( + streamdetails.media_type == MediaType.ANNOUNCEMENT + or not stream_duration + ): yield chunk - continue - - # first part of track and we need to (cross)fade: fill buffer - if bytes_written < buf_size and (last_fadeout_part or fade_in): - bytes_written += len(chunk) - buffer += chunk - continue - - # last part of track: fill buffer - if bytes_written >= (total_size - buf_size): bytes_written += len(chunk) - buffer += chunk continue - # buffer full for fade-in / crossfade - if buffer and (last_fadeout_part or fade_in): - # strip silence of start and create fade-in part + # buffer full for crossfade + if last_fadeout_part and (seconds_in_buffer >= buffer_duration): + # strip silence of start first_part = await strip_silence( buffer + chunk, pcm_fmt, self.pcm_sample_rate ) - - if last_fadeout_part: - # crossfade - fadein_part = first_part[:crossfade_size] - remaining_bytes = first_part[crossfade_size:] - crossfade_part = await crossfade_pcm_parts( - fadein_part, - last_fadeout_part, - crossfade_duration, - pcm_fmt, - self.pcm_sample_rate, - ) - # send crossfade_part - yield crossfade_part - bytes_written += len(crossfade_part) - # also write the leftover bytes from the strip action - if remaining_bytes: - yield remaining_bytes - bytes_written += len(remaining_bytes) - else: - # fade-in - fadein_part = await fadein_pcm_part( - first_part, - fade_in, - pcm_fmt, - self.pcm_sample_rate, - ) - yield fadein_part - bytes_written += len(fadein_part) + # perform crossfade + fadein_part = first_part[:crossfade_size] + remaining_bytes = first_part[crossfade_size:] + crossfade_part = await crossfade_pcm_parts( + fadein_part, + last_fadeout_part, + crossfade_duration, + pcm_fmt, + self.pcm_sample_rate, + ) + # send crossfade_part + yield crossfade_part + bytes_written += len(crossfade_part) + # also write the leftover bytes from the strip action + if remaining_bytes: + yield remaining_bytes + bytes_written += len(remaining_bytes) # clear vars last_fadeout_part = b"" buffer = b"" continue - # all other: middle of track or no fade actions, just yield the audio - bytes_written += len(chunk) + # first part of track and we need to crossfade: fill buffer + if last_fadeout_part: + buffer += chunk + continue + + # last part of track: fill buffer + if buffer or (seconds_streamed >= (stream_duration - buffer_duration)): + buffer += chunk + continue + + # all other: middle of track or no crossfade action, just yield the audio yield chunk + bytes_written += len(chunk) continue #### HANDLE END OF TRACK + self.logger.debug( + "end of track reached - seconds_streamed: %s - seconds_in_buffer: %s - stream_duration: %s", + seconds_streamed, + seconds_in_buffer, + stream_duration, + ) if buffer: # strip silence from end of audio last_part = await strip_silence( buffer, pcm_fmt, self.pcm_sample_rate, reverse=True ) - - # handle crossfading support - # store fade section to be picked up for next track - - if use_crossfade: - # crossfade is enabled, save fadeout part to pickup for next track - last_part = last_part[-crossfade_size:] + # if crossfade is enabled, save fadeout part to pickup for next track + if use_crossfade and len(last_part) > crossfade_size: + # yield remaining bytes from strip action, + # we only need the crossfade_size part + last_fadeout_part = last_part[-crossfade_size:] remaining_bytes = last_part[:-crossfade_size] - # yield remaining bytes - bytes_written += len(remaining_bytes) yield remaining_bytes + bytes_written += len(remaining_bytes) + elif use_crossfade: last_fadeout_part = last_part else: # no crossfade enabled, just yield the stripped audio data - bytes_written += len(last_part) yield last_part + bytes_written += len(last_part) - # end of the track reached - queue_track.streamdetails.seconds_streamed = bytes_written / sample_size + # end of the track reached - store accurate duration + buffer = b"" + queue_track.streamdetails.seconds_streamed = ( + bytes_written / sample_size_per_second + ) self.logger.debug( "Finished Streaming queue track: %s (%s) on queue %s", queue_track.uri, diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index c7542016..444b7228 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -11,7 +11,7 @@ from time import time from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple import aiofiles -from aiohttp import ClientError, ClientTimeout +from aiohttp import ClientTimeout from music_assistant.helpers.process import AsyncProcess, check_output from music_assistant.helpers.util import create_tempfile @@ -81,51 +81,21 @@ async def crossfade_pcm_parts( ] async with AsyncProcess(args, True) as proc: crossfade_data, _ = await proc.communicate(fade_in_part) + if crossfade_data: + LOGGER.debug( + "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s", + len(fade_in_part), + len(fade_out_part), + len(crossfade_data), + ) + return crossfade_data + # no crossfade_data, return original data instead LOGGER.debug( - "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s", + "crossfade of pcm chunks failed: not enough data. fade_in_part: %s - fade_out_part: %s", len(fade_in_part), len(fade_out_part), - len(crossfade_data), ) - return crossfade_data - - -async def fadein_pcm_part( - pcm_audio: bytes, - fade_length: int, - fmt: ContentType, - sample_rate: int, - channels: int = 2, -) -> bytes: - """Fadein chunk of pcm/raw audio using ffmpeg.""" - args = [ - # generic args - "ffmpeg", - "-hide_banner", - "-loglevel", - "quiet", - # fade_in part (stdin) - "-acodec", - fmt.name.lower(), - "-f", - fmt.value, - "-ac", - str(channels), - "-ar", - str(sample_rate), - "-i", - "-", - # filter args - "-af", - f"afade=type=in:start_time=0:duration={fade_length}", - # output args - "-f", - fmt.value, - "-", - ] - async with AsyncProcess(args, True) as proc: - result_audio, _ = await proc.communicate(pcm_audio) - return result_audio + return fade_out_part + fade_in_part async def strip_silence( @@ -152,9 +122,15 @@ async def strip_silence( ] # filter args if reverse: - args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"] + args += [ + "-af", + "areverse,atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02,areverse", + ] else: - args += ["-af", "silenceremove=1:0:-50dB:detection=peak"] + args += [ + "-af", + "atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02", + ] # output args args += ["-f", fmt.value, "-"] async with AsyncProcess(args, True) as proc: @@ -251,29 +227,29 @@ async def get_stream_details( queue_item.streamdetails.seconds_skipped = 0 queue_item.streamdetails.seconds_streamed = 0 streamdetails = queue_item.streamdetails - - # fetch streamdetails from provider - # always request the full item as there might be other qualities available - full_item = await mass.music.get_item_by_uri(queue_item.uri) - # sort by quality and check track availability - for prov_media in sorted( - full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True - ): - if not prov_media.available: - continue - # get streamdetails from provider - music_prov = mass.music.get_provider(prov_media.prov_id) - if not music_prov or not music_prov.available: - continue # provider temporary unavailable ? - try: - streamdetails: StreamDetails = await music_prov.get_stream_details( - prov_media.item_id - ) - streamdetails.content_type = ContentType(streamdetails.content_type) - except MusicAssistantError as err: - LOGGER.warning(str(err)) - else: - break + else: + # fetch streamdetails from provider + # always request the full item as there might be other qualities available + full_item = await mass.music.get_item_by_uri(queue_item.uri) + # sort by quality and check track availability + for prov_media in sorted( + full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True + ): + if not prov_media.available: + continue + # get streamdetails from provider + music_prov = mass.music.get_provider(prov_media.prov_id) + if not music_prov or not music_prov.available: + continue # provider temporary unavailable ? + try: + streamdetails: StreamDetails = await music_prov.get_stream_details( + prov_media.item_id + ) + streamdetails.content_type = ContentType(streamdetails.content_type) + except MusicAssistantError as err: + LOGGER.warning(str(err)) + else: + break if not streamdetails: raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}") @@ -382,6 +358,7 @@ async def get_media_stream( sample_rate: int, channels: int = 2, seek_position: int = 0, + chunk_size: int = 64000, ) -> AsyncGenerator[bytes, None]: """Get the PCM audio stream for the given streamdetails.""" assert pcm_fmt.is_pcm(), "Output format must be a PCM type" @@ -409,9 +386,8 @@ async def get_media_stream( ffmpeg_proc.attach_task(writer()) # yield chunks from stdout - sample_size = get_chunksize(pcm_fmt, sample_rate, 24, channels, 10) try: - async for chunk in ffmpeg_proc.iter_any(sample_size): + async for chunk in ffmpeg_proc.iter_chunked(chunk_size): yield chunk except (asyncio.CancelledError, GeneratorExit) as err: @@ -436,47 +412,33 @@ async def get_radio_stream( ) -> AsyncGenerator[bytes, None]: """Get radio audio stream from HTTP, including metadata retrieval.""" headers = {"Icy-MetaData": "1"} - timeout = ClientTimeout(total=0, connect=10, sock_read=10) - reconnects = 0 - while True: - # in loop to reconnect on connection failure - try: - LOGGER.debug("radio stream (re)connecting to: %s", url) - async with mass.http_session.get( - url, headers=headers, timeout=timeout - ) as resp: - headers = resp.headers - meta_int = int(headers.get("icy-metaint", "0")) - # stream with ICY Metadata - if meta_int: - while True: - audio_chunk = await resp.content.readexactly(meta_int) - yield audio_chunk - meta_byte = await resp.content.readexactly(1) - meta_length = ord(meta_byte) * 16 - meta_data = await resp.content.readexactly(meta_length) - if not meta_data: - continue - meta_data = meta_data.rstrip(b"\0") - stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data) - if not stream_title: - continue - stream_title = stream_title.group(1).decode() - if stream_title != streamdetails.stream_title: - streamdetails.stream_title = stream_title - if queue := mass.players.get_player_queue( - streamdetails.queue_id - ): - queue.signal_update() - # Regular HTTP stream - else: - async for chunk in resp.content.iter_any(): - yield chunk - except (asyncio.exceptions.TimeoutError, ClientError) as err: - # reconnect on http error (max 5 times) - if reconnects >= 5: - raise err - reconnects += 1 + timeout = ClientTimeout(total=0, connect=30, sock_read=120) + async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp: + headers = resp.headers + meta_int = int(headers.get("icy-metaint", "0")) + # stream with ICY Metadata + if meta_int: + while True: + audio_chunk = await resp.content.readexactly(meta_int) + yield audio_chunk + meta_byte = await resp.content.readexactly(1) + meta_length = ord(meta_byte) * 16 + meta_data = await resp.content.readexactly(meta_length) + if not meta_data: + continue + meta_data = meta_data.rstrip(b"\0") + stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data) + if not stream_title: + continue + stream_title = stream_title.group(1).decode() + if stream_title != streamdetails.stream_title: + streamdetails.stream_title = stream_title + if queue := mass.players.get_player_queue(streamdetails.queue_id): + queue.signal_update() + # Regular HTTP stream + else: + async for chunk in resp.content.iter_any(): + yield chunk async def get_http_stream( @@ -504,7 +466,8 @@ async def get_http_stream( buffer = b"" buffer_all = False bytes_received = 0 - async with mass.http_session.get(url, headers=headers) as resp: + timeout = ClientTimeout(total=0, connect=30, sock_read=120) + async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp: is_partial = resp.status == 206 buffer_all = seek_position and not is_partial async for chunk in resp.content.iter_any(): diff --git a/music_assistant/helpers/database.py b/music_assistant/helpers/database.py index 54a10eea..3f237dd4 100755 --- a/music_assistant/helpers/database.py +++ b/music_assistant/helpers/database.py @@ -49,7 +49,9 @@ class Database: async def get_setting(self, key: str) -> str | None: """Get setting from settings table.""" - return await self.get_row(TABLE_SETTINGS, {"key": key}) + if db_row := await self.get_row(TABLE_SETTINGS, {"key": key}): + return db_row["value"] + return None async def set_setting(self, key: str, value: str) -> None: """Set setting in settings table.""" @@ -177,7 +179,7 @@ class Database: await self.__create_database_tables() try: if prev_version := await self.get_setting("version"): - prev_version = int(prev_version["value"]) + prev_version = int(prev_version) else: prev_version = 0 except (KeyError, ValueError): diff --git a/music_assistant/helpers/tags.py b/music_assistant/helpers/tags.py index 1a02a5d6..fa4547fa 100644 --- a/music_assistant/helpers/tags.py +++ b/music_assistant/helpers/tags.py @@ -151,7 +151,7 @@ class AudioTags: # convert all tag-keys to lowercase without spaces tags = { key.lower().replace(" ", "").replace("_", ""): value - for key, value in raw["format"]["tags"].items() + for key, value in raw["format"].get("tags", {}).items() } return AudioTags( diff --git a/music_assistant/models/enums.py b/music_assistant/models/enums.py index 65167877..3e7b91e1 100644 --- a/music_assistant/models/enums.py +++ b/music_assistant/models/enums.py @@ -12,6 +12,7 @@ class MediaType(Enum): PLAYLIST = "playlist" RADIO = "radio" FOLDER = "folder" + ANNOUNCEMENT = "announcement" UNKNOWN = "unknown" diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 43ee4c1e..35840286 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio from abc import ABC from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Tuple +from typing import TYPE_CHECKING, Any, Dict, List from mashumaro import DataClassDictMixin @@ -42,8 +42,8 @@ class Player(ABC): _attr_volume_level: int = 100 _attr_volume_muted: bool = False _attr_device_info: DeviceInfo = DeviceInfo() - _attr_default_sample_rates: Tuple[int] = (44100, 48000, 88200, 96000) - _attr_default_stream_type: ContentType = ContentType.FLAC + _attr_max_sample_rate: int = 96000 + _attr_stream_type: ContentType = ContentType.FLAC # below objects will be set by playermanager at register/update mass: MusicAssistant = None # type: ignore[assignment] _prev_state: dict = {} @@ -125,15 +125,15 @@ class Player(ABC): # DEFAULT PLAYER SETTINGS @property - def default_sample_rates(self) -> Tuple[int]: - """Return the default supported sample rates.""" + def max_sample_rate(self) -> int: + """Return the (default) max supported sample rate.""" # if a player does not report/set its supported sample rates, we use a pretty safe default - return self._attr_default_sample_rates + return self._attr_max_sample_rate @property - def default_stream_type(self) -> ContentType: - """Return the default content type to use for streaming.""" - return self._attr_default_stream_type + def stream_type(self) -> ContentType: + """Return the default/preferred content type to use for streaming.""" + return self._attr_stream_type # GROUP PLAYER ATTRIBUTES AND METHODS (may be overridden if needed) # a player can optionally be a group leader (e.g. Sonos) diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index a81bfeec..6e9d0403 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -9,6 +9,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from music_assistant.models.enums import ( + ContentType, EventType, MediaType, ProviderType, @@ -17,7 +18,11 @@ from music_assistant.models.enums import ( ) from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError from music_assistant.models.event import MassEvent -from music_assistant.models.media_items import MediaItemType, media_from_dict +from music_assistant.models.media_items import ( + MediaItemType, + StreamDetails, + media_from_dict, +) from .player import Player, PlayerState from .queue_item import QueueItem @@ -46,8 +51,8 @@ class QueueSnapShot: items: List[QueueItem] index: Optional[int] position: int - repeat: RepeatMode - shuffle: bool + settings: dict + volume_level: int class PlayerQueue: @@ -70,6 +75,7 @@ class PlayerQueue: self._last_player_update: int = 0 self._last_stream_id: str = "" self._snapshot: Optional[QueueSnapShot] = None + self._announcement_in_progress: bool = False async def setup(self) -> None: """Handle async setup of instance.""" @@ -105,8 +111,9 @@ class PlayerQueue: if not self._stream_id: return None if stream := self.mass.streams.queue_streams.get(self._stream_id): - return stream.index_in_buffer - return None + if not stream.done.is_set(): + return stream.index_in_buffer + return self.current_index @property def active(self) -> bool: @@ -198,6 +205,9 @@ class PlayerQueue: QueueOption.ADD -> Append new items at end of the queue :param passive: if passive set to true the stream url will not be sent to the player. """ + if self._announcement_in_progress: + self.logger.warning("Ignore queue command: An announcement is in progress") + return # a single item or list of items may be provided if not isinstance(media, list): media = [media] @@ -262,76 +272,114 @@ class PlayerQueue: elif queue_opt == QueueOption.ADD: await self.append(queue_items) - async def play_alert( - self, uri: str, announce: bool = False, gain_correct: int = 6 - ) -> str: + async def play_announcement(self, url: str, prepend_alert: bool = False) -> str: """ - Play given uri as Alert on the queue. + Play given uri as Announcement on the queue. - uri: Uri that should be played as announcement, can be Music Assistant URI or plain url. - announce: Prepend the (TTS) alert with a small announce sound. - gain_correct: Adjust the gain of the alert sound (in dB). + url: URL that should be played as announcement, can only be plain url. + prepend_alert: Prepend the (TTS) announcement with an alert bell sound. """ - assert not self._snapshot, "Alert already in progress" + if self._announcement_in_progress: + self.logger.warning( + "Ignore queue command: An announcement is (already) in progress" + ) + return - # create snapshot - await self.snapshot_create() + def create_announcement(_url: str): + return QueueItem( + uri=_url, + name="announcement", + duration=30, + streamdetails=StreamDetails( + provider=ProviderType.URL, + item_id=_url, + content_type=ContentType.try_parse(_url), + media_type=MediaType.ANNOUNCEMENT, + loudness=0, + gain_correct=4, + data=_url, + ), + media_type=MediaType.ANNOUNCEMENT, + ) - queue_items = [] + try: + # create snapshot + await self.snapshot_create() + # stop player if needed + if self.active and self.player.state in ( + PlayerState.PLAYING, + PlayerState.PAUSED, + ): + await self.stop() + await self._wait_for_state((PlayerState.OFF, PlayerState.IDLE)) + self._announcement_in_progress = True + + # turn on player if needed + if not self.player.powered: + await self.player.power(True) + await self._wait_for_state(PlayerState.IDLE) + + # adjust volume if needed + if self._settings.announce_volume_increase: + announce_volume = ( + self.player.volume_level + self._settings.announce_volume_increase + ) + announce_volume = min(announce_volume, 100) + announce_volume = max(announce_volume, 0) + await self.player.volume_set(announce_volume) + + # adjust queue settings for announce playback + self._settings.from_dict( + { + "repeat_mode": "off", + "shuffle_enabled": False, + } + ) - # prepend annnounce sound if needed - if announce: - url_prov = self.mass.music.get_provider(ProviderType.URL) - media_item = await url_prov.parse_item(ALERT_ANNOUNCE_FILE) - queue_item = QueueItem.from_media_item(media_item) - queue_items.append(queue_item) + queue_items = [] + # prepend alert sound if needed + if prepend_alert: + queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE)) - # parse provided uri into a MA MediaItem - try: - media_item = await self.mass.music.get_item_by_uri(uri) - queue_items.append(QueueItem.from_media_item(media_item)) - except MusicAssistantError as err: - # invalid MA uri or item not found error - raise MediaNotFoundError(f"Invalid uri: {uri}") from err + queue_items.append(create_announcement(url)) - # start queue with alert sound(s) - self._items = queue_items - stream = await self.queue_stream_start( - start_index=0, seek_position=0, fade_in=False, is_alert=True - ) - # execute the play command on the player(s) - await self.player.play_url(stream.url) + # append silence track. we use that as a reliable way to make sure + # there is enough buffer for the player to start quickly + # and to detect when we finished playing the alert + silence_item = create_announcement(self.mass.streams.get_silence_url()) + queue_items.append(silence_item) - # wait for the player to finish playing - play_started = asyncio.Event() - play_stopped = asyncio.Event() + # start queue with announcement sound(s) + self._items = queue_items + stream = await self.queue_stream_start(start_index=0, seek_position=0) + # execute the play command on the player(s) + await self.player.play_url(stream.url) - def handle_event(evt: MassEvent): - if self.player.state == PlayerState.PLAYING: - play_started.set() - elif play_started.is_set(): - play_stopped.set() + # wait for the player to finish playing + await asyncio.sleep(5) + await self._wait_for_state(PlayerState.PLAYING, silence_item.item_id) - unsub = self.mass.subscribe( - handle_event, EventType.QUEUE_UPDATED, self.queue_id - ) - try: - await asyncio.wait_for(play_stopped.wait(), 30) - except asyncio.TimeoutError: - self.logger.warning("Timeout while playing alert") + except Exception as err: # pylint: disable=broad-except + self.logger.exception("Error while playing announcement", exc_info=err) finally: - unsub() # restore queue + self._announcement_in_progress = False await self.snapshot_restore() async def stop(self) -> None: """Stop command on queue player.""" + if self._announcement_in_progress: + self.logger.warning("Ignore queue command: An announcement is in progress") + return self.signal_next = False # redirect to underlying player await self.player.stop() async def play(self) -> None: """Play (unpause) command on queue player.""" + if self._announcement_in_progress: + self.logger.warning("Ignore queue command: An announcement is in progress") + return if self.active and self.player.state == PlayerState.PAUSED: await self.player.play() else: @@ -339,6 +387,9 @@ class PlayerQueue: async def pause(self) -> None: """Pause command on queue player.""" + if self._announcement_in_progress: + self.logger.warning("Ignore queue command: An announcement is in progress") + return # redirect to underlying player await self.player.pause() @@ -391,7 +442,7 @@ class PlayerQueue: if resume_item is not None: resume_pos = resume_pos if resume_pos > 10 else 0 - fade_in = 5 if resume_pos else 0 + fade_in = resume_pos > 0 await self.play_index(resume_item.item_id, resume_pos, fade_in) elif len(self._items) > 0: # items available in queue but no previous track, start at 0 @@ -410,39 +461,48 @@ class PlayerQueue: items=self._items, index=self._current_index, position=self._current_item_elapsed_time, - repeat=self._settings.repeat_mode, - shuffle=self._settings.shuffle_enabled, + settings=self._settings.to_dict(), + volume_level=self.player.volume_level, ) async def snapshot_restore(self) -> None: """Restore snapshot of Queue state.""" assert self._snapshot, "Create snapshot before restoring it." - # clear queue first - await self.clear() - # restore queue - self._settings.repeat_mode = self._snapshot.repeat - self._settings.shuffle_enabled = self._snapshot.shuffle - await self._update_items(self._snapshot.items) - self._current_index = self._snapshot.index - self._current_item_elapsed_time = self._snapshot.position - if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED): - await self.resume() - if self._snapshot.state == PlayerState.PAUSED: - await self.pause() - if not self._snapshot.powered: - await self.player.power(False) - # reset snapshot once restored - self.logger.debug("Restored snapshot...") - self._snapshot = None + try: + # clear queue first + await self.clear() + # restore volume if needed + if self._snapshot.volume_level != self.player.volume_level: + await self.player.volume_set(self._snapshot.volume_level) + # restore queue + self._settings.from_dict(self._snapshot.settings) + await self.update_items(self._snapshot.items) + self._current_index = self._snapshot.index + self._current_item_elapsed_time = self._snapshot.position + if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED): + await self.resume() + if self._snapshot.state == PlayerState.PAUSED: + await self.pause() + if not self._snapshot.powered: + await self.player.power(False) + # reset snapshot once restored + self.logger.debug("Restored snapshot...") + except Exception as err: # pylint: disable=broad-except + self.logger.exception("Error while restoring snapshot", exc_info=err) + finally: + self._snapshot = None async def play_index( self, index: Union[int, str], seek_position: int = 0, - fade_in: int = 0, + fade_in: bool = False, passive: bool = False, ) -> None: """Play item at index (or item_id) X in queue.""" + if self._announcement_in_progress: + self.logger.warning("Ignore queue command: An announcement is in progress") + return if not isinstance(index, int): index = self.index_by_id(index) if index is None: @@ -480,7 +540,7 @@ class PlayerQueue: return # move the item in the list items.insert(new_index, items.pop(item_index)) - await self._update_items(items) + await self.update_items(items) async def delete_item(self, queue_item_id: str) -> None: """Delete item (by id or index) from the queue.""" @@ -563,13 +623,13 @@ class PlayerQueue: queue_items = played_items + cur_item + next_items else: queue_items = self._items + queue_items - await self._update_items(queue_items) + await self.update_items(queue_items) async def clear(self) -> None: """Clear all items in the queue.""" if self.player.state not in (PlayerState.IDLE, PlayerState.OFF): await self.stop() - await self._update_items([]) + await self.update_items([]) def on_player_update(self) -> None: """Call when player updates.""" @@ -600,6 +660,8 @@ class PlayerQueue: """Update queue details, called when player updates.""" if self.player.active_queue != self: return + if not self.active: + return new_index = self._current_index track_time = self._current_item_elapsed_time new_item_loaded = False @@ -636,14 +698,10 @@ class PlayerQueue: async def queue_stream_start( self, start_index: int, - seek_position: int, - fade_in: bool, - is_alert: bool = False, + seek_position: int = 0, + fade_in: bool = False, ) -> QueueStream: """Start the queue stream runner.""" - self._current_item_elapsed_time = 0 - self._current_index = start_index - # start the queue stream background task stream = await self.mass.streams.start_queue_stream( queue=self, @@ -651,21 +709,20 @@ class PlayerQueue: seek_position=seek_position, fade_in=fade_in, output_format=self._settings.stream_type, - is_alert=is_alert, ) self._stream_id = stream.stream_id + self._current_item_elapsed_time = 0 + self._current_index = start_index return stream def get_next_index(self, cur_index: Optional[int]) -> int: """Return the next index for the queue, accounting for repeat settings.""" - alert_active = self.stream and self.stream.is_alert # handle repeat single track - if not alert_active and self.settings.repeat_mode == RepeatMode.ONE: + if self.settings.repeat_mode == RepeatMode.ONE: return cur_index # handle repeat all if ( - not alert_active - and self.settings.repeat_mode == RepeatMode.ALL + self.settings.repeat_mode == RepeatMode.ALL and self._items and cur_index == (len(self._items) - 1) ): @@ -710,7 +767,7 @@ class PlayerQueue: "settings": self.settings.to_dict(), } - async def _update_items(self, queue_items: List[QueueItem]) -> None: + async def update_items(self, queue_items: List[QueueItem]) -> None: """Update the existing queue items, mostly caused by reordering.""" self._items = queue_items self.signal_update(True) @@ -778,3 +835,28 @@ class PlayerQueue: "current_item_elapsed_time": self._current_item_elapsed_time, }, ) + + async def _wait_for_state( + self, + state: Union[None, PlayerState, Tuple[PlayerState]], + queue_item_id: Optional[str] = None, + timeout: int = 30, + ) -> None: + """Wait for player(queue) to reach a specific state.""" + if state is not None and not isinstance(state, tuple): + state = (state,) + + count = 0 + while count < timeout * 10: + + if (state is None or self.player.state in state) and ( + queue_item_id is None + or self.current_item + and self.current_item.item_id == queue_item_id + ): + return + + count += 1 + await asyncio.sleep(0.1) + + raise TimeoutError(f"Timeout while waiting on state(s) {state}") diff --git a/music_assistant/models/queue_settings.py b/music_assistant/models/queue_settings.py index f88cf8fe..02e94f48 100644 --- a/music_assistant/models/queue_settings.py +++ b/music_assistant/models/queue_settings.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio import random -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional from .enums import ContentType, CrossFadeMode, RepeatMode @@ -24,8 +24,9 @@ class QueueSettings: self._crossfade_duration: int = 6 self._volume_normalization_enabled: bool = True self._volume_normalization_target: int = -14 - self._stream_type: Optional[ContentType] = None - self._sample_rates: Optional[Tuple[int]] = None + self._stream_type: ContentType = queue.player.stream_type + self._max_sample_rate: int = queue.player.max_sample_rate + self._announce_volume_increase: int = 15 @property def repeat_mode(self) -> RepeatMode: @@ -50,24 +51,29 @@ class QueueSettings: if not self._shuffle_enabled and enabled: # shuffle requested self._shuffle_enabled = True - if self._queue.current_index is not None: - played_items = self._queue.items[: self._queue.current_index] - next_items = self._queue.items[self._queue.current_index + 1 :] + cur_index = self._queue.index_in_buffer + cur_item = self._queue.get_item(cur_index) + if cur_item is not None: + played_items = self._queue.items[:cur_index] + next_items = self._queue.items[cur_index + 1 :] # for now we use default python random function # can be extended with some more magic based on last_played and stuff next_items = random.sample(next_items, len(next_items)) - items = played_items + [self._queue.current_item] + next_items - asyncio.create_task(self._queue.load(items)) + + items = played_items + [cur_item] + next_items + asyncio.create_task(self._queue.update_items(items)) self._on_update("shuffle_enabled") elif self._shuffle_enabled and not enabled: # unshuffle self._shuffle_enabled = False - if self._queue.current_index is not None: - played_items = self._queue.items[: self._queue.current_index] - next_items = self._queue.items[self._queue.current_index + 1 :] + cur_index = self._queue.index_in_buffer + cur_item = self._queue.get_item(cur_index) + if cur_item is not None: + played_items = self._queue.items[:cur_index] + next_items = self._queue.items[cur_index + 1 :] next_items.sort(key=lambda x: x.sort_index, reverse=False) - items = played_items + [self._queue.current_item] + next_items - asyncio.create_task(self._queue.load(items)) + items = played_items + [cur_item] + next_items + asyncio.create_task(self._queue.update_items(items)) self._on_update("shuffle_enabled") @property @@ -126,9 +132,6 @@ class QueueSettings: @property def stream_type(self) -> ContentType: """Return supported/preferred stream type for this playerqueue.""" - if self._stream_type is None: - # return player's default - return self._queue.player.default_stream_type return self._stream_type @stream_type.setter @@ -139,24 +142,28 @@ class QueueSettings: self._on_update("stream_type") @property - def sample_rates(self) -> Tuple[int]: - """Return supported/preferred sample rate(s) for this playerqueue.""" - if self._sample_rates is None: - # return player's default - return self._queue.player.default_sample_rates - return self._sample_rates - - @sample_rates.setter - def sample_rates(self, value: ContentType) -> None: + def max_sample_rate(self) -> int: + """Return max supported/needed sample rate(s) for this playerqueue.""" + return self._max_sample_rate + + @max_sample_rate.setter + def max_sample_rate(self, value: ContentType) -> None: """Set supported/preferred sample rate(s) for this playerqueue.""" - if self._stream_type != value: - self._stream_type = value - self._on_update("sample_rates") + if self._max_sample_rate != value: + self._max_sample_rate = value + self._on_update("max_sample_rate") @property - def max_sample_rate(self) -> int: - """Return the maximum samplerate supported by this playerqueue.""" - return max(self.sample_rates) + def announce_volume_increase(self) -> int: + """Return announce_volume_increase setting (percentage relative to current).""" + return self._announce_volume_increase + + @announce_volume_increase.setter + def announce_volume_increase(self, volume_increase: int) -> None: + """Set announce_volume_increase setting.""" + if self._announce_volume_increase != volume_increase: + self._announce_volume_increase = volume_increase + self._on_update("announce_volume_increase") def to_dict(self) -> Dict[str, Any]: """Return dict from settings.""" @@ -168,31 +175,45 @@ class QueueSettings: "volume_normalization_enabled": self.volume_normalization_enabled, "volume_normalization_target": self.volume_normalization_target, "stream_type": self.stream_type.value, - "sample_rates": self.sample_rates, + "max_sample_rate": self.max_sample_rate, + "announce_volume_increase": self.announce_volume_increase, } + def from_dict(self, d: Dict[str, Any]) -> None: + """Initialize settings from dict.""" + self._repeat_mode = RepeatMode(d.get("repeat_mode", self._repeat_mode.value)) + self._shuffle_enabled = bool(d.get("shuffle_enabled", self._shuffle_enabled)) + self._crossfade_mode = CrossFadeMode( + d.get("crossfade_mode", self._crossfade_mode.value) + ) + self._crossfade_duration = int( + d.get("crossfade_duration", self._crossfade_duration) + ) + self._volume_normalization_enabled = bool( + d.get("volume_normalization_enabled", self._volume_normalization_enabled) + ) + self._volume_normalization_target = float( + d.get("volume_normalization_target", self._volume_normalization_target) + ) + self._stream_type = ContentType(d.get("stream_type", self._stream_type.value)) + self._max_sample_rate = int(d.get("max_sample_rate", self._max_sample_rate)) + self._announce_volume_increase = int( + d.get("announce_volume_increase", self._announce_volume_increase) + ) + async def restore(self) -> None: """Restore state from db.""" - for key, val_type in ( - ("repeat_mode", RepeatMode), - ("crossfade_mode", CrossFadeMode), - ("shuffle_enabled", bool), - ("crossfade_duration", int), - ("volume_normalization_enabled", bool), - ("volume_normalization_target", float), - ("stream_type", ContentType), - ("sample_rates", tuple), - ): + values = {} + for key in self.to_dict(): db_key = f"{self._queue.queue_id}_{key}" if db_value := await self.mass.database.get_setting(db_key): - value = val_type(db_value["value"]) - setattr(self, f"_{key}", value) + values[key] = db_value + self.from_dict(values) def _on_update(self, changed_key: Optional[str] = None) -> None: """Handle state changed.""" self._queue.signal_update() self.mass.create_task(self.save(changed_key)) - # TODO: restart play if setting changed that impacts playing queue async def save(self, changed_key: Optional[str] = None) -> None: """Save state in db."""