Make Announce feature more robust (#409)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 12 Jul 2022 21:08:27 +0000 (23:08 +0200)
committerGitHub <noreply@github.com>
Tue, 12 Jul 2022 21:08:27 +0000 (23:08 +0200)
* Rename alert to announce (use same naming as HA)

* Make the Announce/alert feature more robust

- recover from unexpected conditions
- fix fade-in
- use silence stream to detect end of playback
- handle edge cases (discovered so far)

* prevent queue command while announcement busy

* create setting for announce volume

* fix wrong loudness for url provider

music_assistant/controllers/music/__init__.py
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/database.py
music_assistant/helpers/tags.py
music_assistant/models/enums.py
music_assistant/models/player.py
music_assistant/models/player_queue.py
music_assistant/models/queue_settings.py

index 06a3c3a6ef2b7c08834896d7942f383ea46a1ae2..4c25de4095b9cfbc08146c2b64d8309123231ad8 100755 (executable)
@@ -329,6 +329,9 @@ class MusicController:
     async def get_provider_loudness(self, provider: ProviderType) -> float | None:
         """Get average integrated loudness for tracks of given provider."""
         all_items = []
+        if provider == ProviderType.URL:
+            # this is not a very good idea for random urls
+            return None
         for db_row in await self.mass.database.get_rows(
             TABLE_TRACK_LOUDNESS,
             {
index 15fa89b8f77c1ececa795efdc7e065432b275441..16a854c8f4b7d0cddda9d8da139ae74b39c630fc 100644 (file)
@@ -14,7 +14,6 @@ from aiohttp import web
 from music_assistant.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
-    fadein_pcm_part,
     get_chunksize,
     get_media_stream,
     get_preview_stream,
@@ -76,6 +75,14 @@ class StreamsController:
         """Return url to control endpoint."""
         return f"{self.base_url}/{queue_id}/{control}"
 
+    def get_silence_url(
+        self,
+        content_type: ContentType = ContentType.WAV,
+    ) -> str:
+        """Generate stream url for a silence Stream."""
+        ext = content_type.value
+        return f"{self.base_url}/silence.{ext}"
+
     async def setup(self) -> None:
         """Async initialize of module."""
         app = web.Application()
@@ -147,7 +154,7 @@ class StreamsController:
     @staticmethod
     async def serve_silence(request: web.Request):
         """Serve some nice silence."""
-        duration = int(request.query.get("duration", 60))
+        duration = int(request.query.get("duration", 3600))
         fmt = ContentType.try_parse(request.match_info["fmt"])
 
         resp = web.StreamResponse(
@@ -238,7 +245,6 @@ class StreamsController:
         seek_position: int,
         fade_in: bool,
         output_format: ContentType,
-        is_alert: bool,
     ) -> QueueStream:
         """Start running a queue stream."""
         # generate unique stream url
@@ -252,8 +258,8 @@ class StreamsController:
         streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id)
 
         # work out pcm details
-        if is_alert:
-            pcm_sample_rate = 41000
+        if streamdetails.media_type == MediaType.ANNOUNCEMENT:
+            pcm_sample_rate = 44100
             pcm_bit_depth = 16
             pcm_channels = 2
             allow_resample = True
@@ -284,7 +290,6 @@ class StreamsController:
             pcm_bit_depth=pcm_bit_depth,
             pcm_channels=pcm_channels,
             allow_resample=allow_resample,
-            is_alert=is_alert,
             autostart=True,
         )
         # cleanup stale previous queue tasks
@@ -317,7 +322,6 @@ class QueueStream:
         pcm_channels: int = 2,
         pcm_floating_point: bool = False,
         allow_resample: bool = False,
-        is_alert: bool = False,
         autostart: bool = False,
     ):
         """Init QueueStreamJob instance."""
@@ -332,7 +336,6 @@ class QueueStream:
         self.pcm_channels = pcm_channels
         self.pcm_floating_point = pcm_floating_point
         self.allow_resample = allow_resample
-        self.is_alert = is_alert
         self.url = queue.mass.streams.get_stream_url(stream_id, output_format)
 
         self.mass = queue.mass
@@ -422,6 +425,12 @@ class QueueStream:
             # add metadata
             "-metadata",
             "title=Streaming from Music Assistant",
+        ]
+        # fade-in if needed
+        if self.fade_in:
+            ffmpeg_args += ["-af", "afade=t=in:st=0:d=5"]
+
+        ffmpeg_args += [
             # output args
             "-f",
             self.output_format.value,
@@ -513,7 +522,6 @@ class QueueStream:
             if track_count == 1:
                 queue_index = self.start_index
                 seek_position = self.seek_position
-                fade_in = self.fade_in
             else:
                 next_index = self.queue.get_next_index(queue_index)
                 # break here if repeat is enabled
@@ -522,7 +530,6 @@ class QueueStream:
                     break
                 queue_index = next_index
                 seek_position = 0
-                fade_in = False
             self.index_in_buffer = queue_index
             # send signal that we've loaded a new track into the buffer
             self.queue.signal_update()
@@ -567,9 +574,16 @@ class QueueStream:
                 break
 
             # check crossfade ability
-            use_crossfade = self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED
+            use_crossfade = (
+                self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED
+                and self.queue.settings.crossfade_duration > 0
+                and streamdetails.media_type != MediaType.ANNOUNCEMENT
+            )
+            # do not crossfade tracks of same album
             if (
-                prev_track is not None
+                use_crossfade
+                and self.queue.settings.crossfade_mode != CrossFadeMode.ALWAYS
+                and prev_track is not None
                 and prev_track.media_type == MediaType.TRACK
                 and queue_track.media_type == MediaType.TRACK
             ):
@@ -580,33 +594,35 @@ class QueueStream:
                     and new_item.album is not None
                     and prev_item.album == new_item.album
                 ):
+                    self.logger.debug("Skipping crossfade: Tracks are from same album")
                     use_crossfade = False
             prev_track = queue_track
 
-            # calculate sample_size based on PCM params for 200ms of audio
+            # calculate sample_size based on PCM params for 1 second of audio
             input_format = ContentType.from_bit_depth(
                 self.pcm_bit_depth, self.pcm_floating_point
             )
-            sample_size = get_chunksize(
+            sample_size_per_second = get_chunksize(
                 input_format,
                 self.pcm_sample_rate,
                 self.pcm_bit_depth,
                 self.pcm_channels,
             )
-            # buffer size is duration of crossfade + 3 seconds
-            crossfade_duration = self.queue.settings.crossfade_duration or fade_in or 1
-            crossfade_size = (sample_size * 5) * crossfade_duration
-            buf_size = (sample_size * 5) * (crossfade_duration * 3)
-            total_size = (sample_size * 5) * (queue_track.duration or 0)
+            crossfade_duration = self.queue.settings.crossfade_duration
+            crossfade_size = sample_size_per_second * crossfade_duration
+            # buffer_duration has some overhead to account for padded silence
+            buffer_duration = (crossfade_duration or 2) * 2
+            # predict total size to expect for this track from duration
+            stream_duration = (queue_track.duration or 0) - seek_position
 
             self.logger.info(
-                "Start Streaming queue track: %s (%s) for queue %s",
+                "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s",
                 queue_track.uri,
                 queue_track.name,
                 self.queue.player.name,
+                use_crossfade,
             )
             queue_track.streamdetails.seconds_skipped = seek_position
-            chunk_count = 0
             buffer = b""
             bytes_written = 0
             # handle incoming audio chunks
@@ -617,8 +633,11 @@ class QueueStream:
                 sample_rate=self.pcm_sample_rate,
                 channels=self.pcm_channels,
                 seek_position=seek_position,
+                chunk_size=sample_size_per_second,
             ):
-                chunk_count += 1
+
+                seconds_streamed = bytes_written / sample_size_per_second
+                seconds_in_buffer = len(buffer) / sample_size_per_second
 
                 ####  HANDLE FIRST PART OF TRACK
 
@@ -628,96 +647,92 @@ class QueueStream:
                     queue_track.streamdetails.seconds_streamed = 0
                     break
 
-                # track has no duration or duration < 30s: pypass any further processing
-                if queue_track.duration is None or queue_track.duration < 30:
-                    bytes_written += len(chunk)
+                # bypass any processing for radiostreams and announcements
+                if (
+                    streamdetails.media_type == MediaType.ANNOUNCEMENT
+                    or not stream_duration
+                ):
                     yield chunk
-                    continue
-
-                # first part of track and we need to (cross)fade: fill buffer
-                if bytes_written < buf_size and (last_fadeout_part or fade_in):
-                    bytes_written += len(chunk)
-                    buffer += chunk
-                    continue
-
-                # last part of track: fill buffer
-                if bytes_written >= (total_size - buf_size):
                     bytes_written += len(chunk)
-                    buffer += chunk
                     continue
 
-                # buffer full for fade-in / crossfade
-                if buffer and (last_fadeout_part or fade_in):
-                    # strip silence of start and create fade-in part
+                # buffer full for crossfade
+                if last_fadeout_part and (seconds_in_buffer >= buffer_duration):
+                    # strip silence of start
                     first_part = await strip_silence(
                         buffer + chunk, pcm_fmt, self.pcm_sample_rate
                     )
-
-                    if last_fadeout_part:
-                        # crossfade
-                        fadein_part = first_part[:crossfade_size]
-                        remaining_bytes = first_part[crossfade_size:]
-                        crossfade_part = await crossfade_pcm_parts(
-                            fadein_part,
-                            last_fadeout_part,
-                            crossfade_duration,
-                            pcm_fmt,
-                            self.pcm_sample_rate,
-                        )
-                        # send crossfade_part
-                        yield crossfade_part
-                        bytes_written += len(crossfade_part)
-                        # also write the leftover bytes from the strip action
-                        if remaining_bytes:
-                            yield remaining_bytes
-                            bytes_written += len(remaining_bytes)
-                    else:
-                        # fade-in
-                        fadein_part = await fadein_pcm_part(
-                            first_part,
-                            fade_in,
-                            pcm_fmt,
-                            self.pcm_sample_rate,
-                        )
-                        yield fadein_part
-                        bytes_written += len(fadein_part)
+                    # perform crossfade
+                    fadein_part = first_part[:crossfade_size]
+                    remaining_bytes = first_part[crossfade_size:]
+                    crossfade_part = await crossfade_pcm_parts(
+                        fadein_part,
+                        last_fadeout_part,
+                        crossfade_duration,
+                        pcm_fmt,
+                        self.pcm_sample_rate,
+                    )
+                    # send crossfade_part
+                    yield crossfade_part
+                    bytes_written += len(crossfade_part)
+                    # also write the leftover bytes from the strip action
+                    if remaining_bytes:
+                        yield remaining_bytes
+                        bytes_written += len(remaining_bytes)
 
                     # clear vars
                     last_fadeout_part = b""
                     buffer = b""
                     continue
 
-                # all other: middle of track or no fade actions, just yield the audio
-                bytes_written += len(chunk)
+                # first part of track and we need to crossfade: fill buffer
+                if last_fadeout_part:
+                    buffer += chunk
+                    continue
+
+                # last part of track: fill buffer
+                if buffer or (seconds_streamed >= (stream_duration - buffer_duration)):
+                    buffer += chunk
+                    continue
+
+                # all other: middle of track or no crossfade action, just yield the audio
                 yield chunk
+                bytes_written += len(chunk)
                 continue
 
             #### HANDLE END OF TRACK
+            self.logger.debug(
+                "end of track reached - seconds_streamed: %s - seconds_in_buffer: %s - stream_duration: %s",
+                seconds_streamed,
+                seconds_in_buffer,
+                stream_duration,
+            )
 
             if buffer:
                 # strip silence from end of audio
                 last_part = await strip_silence(
                     buffer, pcm_fmt, self.pcm_sample_rate, reverse=True
                 )
-
-                # handle crossfading support
-                # store fade section to be picked up for next track
-
-                if use_crossfade:
-                    # crossfade is enabled, save fadeout part to pickup for next track
-                    last_part = last_part[-crossfade_size:]
+                # if crossfade is enabled, save fadeout part to pickup for next track
+                if use_crossfade and len(last_part) > crossfade_size:
+                    # yield remaining bytes from strip action,
+                    # we only need the crossfade_size part
+                    last_fadeout_part = last_part[-crossfade_size:]
                     remaining_bytes = last_part[:-crossfade_size]
-                    # yield remaining bytes
-                    bytes_written += len(remaining_bytes)
                     yield remaining_bytes
+                    bytes_written += len(remaining_bytes)
+                elif use_crossfade:
                     last_fadeout_part = last_part
                 else:
                     # no crossfade enabled, just yield the stripped audio data
-                    bytes_written += len(last_part)
                     yield last_part
+                    bytes_written += len(last_part)
 
-            # end of the track reached
-            queue_track.streamdetails.seconds_streamed = bytes_written / sample_size
+            # end of the track reached - store accurate duration
+            buffer = b""
+            queue_track.streamdetails.seconds_streamed = (
+                bytes_written / sample_size_per_second
+            )
             self.logger.debug(
                 "Finished Streaming queue track: %s (%s) on queue %s",
                 queue_track.uri,
index c754201642555e171c081092a9d8b2fd9317e0ad..444b722811679b3fd19eaa09ced372ecb073803c 100644 (file)
@@ -11,7 +11,7 @@ from time import time
 from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple
 
 import aiofiles
-from aiohttp import ClientError, ClientTimeout
+from aiohttp import ClientTimeout
 
 from music_assistant.helpers.process import AsyncProcess, check_output
 from music_assistant.helpers.util import create_tempfile
@@ -81,51 +81,21 @@ async def crossfade_pcm_parts(
     ]
     async with AsyncProcess(args, True) as proc:
         crossfade_data, _ = await proc.communicate(fade_in_part)
+        if crossfade_data:
+            LOGGER.debug(
+                "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s",
+                len(fade_in_part),
+                len(fade_out_part),
+                len(crossfade_data),
+            )
+            return crossfade_data
+        # no crossfade_data, return original data instead
         LOGGER.debug(
-            "crossfaded 2 pcm chunks. fade_in_part: %s - fade_out_part: %s - result: %s",
+            "crossfade of pcm chunks failed: not enough data. fade_in_part: %s - fade_out_part: %s",
             len(fade_in_part),
             len(fade_out_part),
-            len(crossfade_data),
         )
-        return crossfade_data
-
-
-async def fadein_pcm_part(
-    pcm_audio: bytes,
-    fade_length: int,
-    fmt: ContentType,
-    sample_rate: int,
-    channels: int = 2,
-) -> bytes:
-    """Fadein chunk of pcm/raw audio using ffmpeg."""
-    args = [
-        # generic args
-        "ffmpeg",
-        "-hide_banner",
-        "-loglevel",
-        "quiet",
-        # fade_in part (stdin)
-        "-acodec",
-        fmt.name.lower(),
-        "-f",
-        fmt.value,
-        "-ac",
-        str(channels),
-        "-ar",
-        str(sample_rate),
-        "-i",
-        "-",
-        # filter args
-        "-af",
-        f"afade=type=in:start_time=0:duration={fade_length}",
-        # output args
-        "-f",
-        fmt.value,
-        "-",
-    ]
-    async with AsyncProcess(args, True) as proc:
-        result_audio, _ = await proc.communicate(pcm_audio)
-        return result_audio
+        return fade_out_part + fade_in_part
 
 
 async def strip_silence(
@@ -152,9 +122,15 @@ async def strip_silence(
     ]
     # filter args
     if reverse:
-        args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"]
+        args += [
+            "-af",
+            "areverse,atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02,areverse",
+        ]
     else:
-        args += ["-af", "silenceremove=1:0:-50dB:detection=peak"]
+        args += [
+            "-af",
+            "atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02",
+        ]
     # output args
     args += ["-f", fmt.value, "-"]
     async with AsyncProcess(args, True) as proc:
@@ -251,29 +227,29 @@ async def get_stream_details(
         queue_item.streamdetails.seconds_skipped = 0
         queue_item.streamdetails.seconds_streamed = 0
         streamdetails = queue_item.streamdetails
-
-    # fetch streamdetails from provider
-    # always request the full item as there might be other qualities available
-    full_item = await mass.music.get_item_by_uri(queue_item.uri)
-    # sort by quality and check track availability
-    for prov_media in sorted(
-        full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True
-    ):
-        if not prov_media.available:
-            continue
-        # get streamdetails from provider
-        music_prov = mass.music.get_provider(prov_media.prov_id)
-        if not music_prov or not music_prov.available:
-            continue  # provider temporary unavailable ?
-        try:
-            streamdetails: StreamDetails = await music_prov.get_stream_details(
-                prov_media.item_id
-            )
-            streamdetails.content_type = ContentType(streamdetails.content_type)
-        except MusicAssistantError as err:
-            LOGGER.warning(str(err))
-        else:
-            break
+    else:
+        # fetch streamdetails from provider
+        # always request the full item as there might be other qualities available
+        full_item = await mass.music.get_item_by_uri(queue_item.uri)
+        # sort by quality and check track availability
+        for prov_media in sorted(
+            full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True
+        ):
+            if not prov_media.available:
+                continue
+            # get streamdetails from provider
+            music_prov = mass.music.get_provider(prov_media.prov_id)
+            if not music_prov or not music_prov.available:
+                continue  # provider temporary unavailable ?
+            try:
+                streamdetails: StreamDetails = await music_prov.get_stream_details(
+                    prov_media.item_id
+                )
+                streamdetails.content_type = ContentType(streamdetails.content_type)
+            except MusicAssistantError as err:
+                LOGGER.warning(str(err))
+            else:
+                break
 
     if not streamdetails:
         raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
@@ -382,6 +358,7 @@ async def get_media_stream(
     sample_rate: int,
     channels: int = 2,
     seek_position: int = 0,
+    chunk_size: int = 64000,
 ) -> AsyncGenerator[bytes, None]:
     """Get the PCM audio stream for the given streamdetails."""
     assert pcm_fmt.is_pcm(), "Output format must be a PCM type"
@@ -409,9 +386,8 @@ async def get_media_stream(
         ffmpeg_proc.attach_task(writer())
 
         # yield chunks from stdout
-        sample_size = get_chunksize(pcm_fmt, sample_rate, 24, channels, 10)
         try:
-            async for chunk in ffmpeg_proc.iter_any(sample_size):
+            async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
                 yield chunk
 
         except (asyncio.CancelledError, GeneratorExit) as err:
@@ -436,47 +412,33 @@ async def get_radio_stream(
 ) -> AsyncGenerator[bytes, None]:
     """Get radio audio stream from HTTP, including metadata retrieval."""
     headers = {"Icy-MetaData": "1"}
-    timeout = ClientTimeout(total=0, connect=10, sock_read=10)
-    reconnects = 0
-    while True:
-        # in loop to reconnect on connection failure
-        try:
-            LOGGER.debug("radio stream (re)connecting to: %s", url)
-            async with mass.http_session.get(
-                url, headers=headers, timeout=timeout
-            ) as resp:
-                headers = resp.headers
-                meta_int = int(headers.get("icy-metaint", "0"))
-                # stream with ICY Metadata
-                if meta_int:
-                    while True:
-                        audio_chunk = await resp.content.readexactly(meta_int)
-                        yield audio_chunk
-                        meta_byte = await resp.content.readexactly(1)
-                        meta_length = ord(meta_byte) * 16
-                        meta_data = await resp.content.readexactly(meta_length)
-                        if not meta_data:
-                            continue
-                        meta_data = meta_data.rstrip(b"\0")
-                        stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
-                        if not stream_title:
-                            continue
-                        stream_title = stream_title.group(1).decode()
-                        if stream_title != streamdetails.stream_title:
-                            streamdetails.stream_title = stream_title
-                            if queue := mass.players.get_player_queue(
-                                streamdetails.queue_id
-                            ):
-                                queue.signal_update()
-                # Regular HTTP stream
-                else:
-                    async for chunk in resp.content.iter_any():
-                        yield chunk
-        except (asyncio.exceptions.TimeoutError, ClientError) as err:
-            # reconnect on http error (max 5 times)
-            if reconnects >= 5:
-                raise err
-            reconnects += 1
+    timeout = ClientTimeout(total=0, connect=30, sock_read=120)
+    async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
+        headers = resp.headers
+        meta_int = int(headers.get("icy-metaint", "0"))
+        # stream with ICY Metadata
+        if meta_int:
+            while True:
+                audio_chunk = await resp.content.readexactly(meta_int)
+                yield audio_chunk
+                meta_byte = await resp.content.readexactly(1)
+                meta_length = ord(meta_byte) * 16
+                meta_data = await resp.content.readexactly(meta_length)
+                if not meta_data:
+                    continue
+                meta_data = meta_data.rstrip(b"\0")
+                stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
+                if not stream_title:
+                    continue
+                stream_title = stream_title.group(1).decode()
+                if stream_title != streamdetails.stream_title:
+                    streamdetails.stream_title = stream_title
+                    if queue := mass.players.get_player_queue(streamdetails.queue_id):
+                        queue.signal_update()
+        # Regular HTTP stream
+        else:
+            async for chunk in resp.content.iter_any():
+                yield chunk
 
 
 async def get_http_stream(
@@ -504,7 +466,8 @@ async def get_http_stream(
     buffer = b""
     buffer_all = False
     bytes_received = 0
-    async with mass.http_session.get(url, headers=headers) as resp:
+    timeout = ClientTimeout(total=0, connect=30, sock_read=120)
+    async with mass.http_session.get(url, headers=headers, timeout=timeout) as resp:
         is_partial = resp.status == 206
         buffer_all = seek_position and not is_partial
         async for chunk in resp.content.iter_any():
index 54a10eea28a7747411ae9c43be3e49cf56a07a50..3f237dd4dc90049c218c411b1a5da4f21f3e535e 100755 (executable)
@@ -49,7 +49,9 @@ class Database:
 
     async def get_setting(self, key: str) -> str | None:
         """Get setting from settings table."""
-        return await self.get_row(TABLE_SETTINGS, {"key": key})
+        if db_row := await self.get_row(TABLE_SETTINGS, {"key": key}):
+            return db_row["value"]
+        return None
 
     async def set_setting(self, key: str, value: str) -> None:
         """Set setting in settings table."""
@@ -177,7 +179,7 @@ class Database:
         await self.__create_database_tables()
         try:
             if prev_version := await self.get_setting("version"):
-                prev_version = int(prev_version["value"])
+                prev_version = int(prev_version)
             else:
                 prev_version = 0
         except (KeyError, ValueError):
index 1a02a5d62a69af83bbc904f8edc9b874289403e3..fa4547fa5b85aef3bb22ef7ad8d8f2cb4db43a0c 100644 (file)
@@ -151,7 +151,7 @@ class AudioTags:
         # convert all tag-keys to lowercase without spaces
         tags = {
             key.lower().replace(" ", "").replace("_", ""): value
-            for key, value in raw["format"]["tags"].items()
+            for key, value in raw["format"].get("tags", {}).items()
         }
 
         return AudioTags(
index 65167877e271b79adc17a8315357e0dffbb5d3db..3e7b91e1f95322eaeef20984c412fdbd1dea4661 100644 (file)
@@ -12,6 +12,7 @@ class MediaType(Enum):
     PLAYLIST = "playlist"
     RADIO = "radio"
     FOLDER = "folder"
+    ANNOUNCEMENT = "announcement"
     UNKNOWN = "unknown"
 
 
index 43ee4c1efe9cbd66de50f5f53b9b27616537dce6..358402861800355c13c1eb78cebcd52e7f054e20 100755 (executable)
@@ -4,7 +4,7 @@ from __future__ import annotations
 import asyncio
 from abc import ABC
 from dataclasses import dataclass
-from typing import TYPE_CHECKING, Any, Dict, List, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List
 
 from mashumaro import DataClassDictMixin
 
@@ -42,8 +42,8 @@ class Player(ABC):
     _attr_volume_level: int = 100
     _attr_volume_muted: bool = False
     _attr_device_info: DeviceInfo = DeviceInfo()
-    _attr_default_sample_rates: Tuple[int] = (44100, 48000, 88200, 96000)
-    _attr_default_stream_type: ContentType = ContentType.FLAC
+    _attr_max_sample_rate: int = 96000
+    _attr_stream_type: ContentType = ContentType.FLAC
     # below objects will be set by playermanager at register/update
     mass: MusicAssistant = None  # type: ignore[assignment]
     _prev_state: dict = {}
@@ -125,15 +125,15 @@ class Player(ABC):
     # DEFAULT PLAYER SETTINGS
 
     @property
-    def default_sample_rates(self) -> Tuple[int]:
-        """Return the default supported sample rates."""
+    def max_sample_rate(self) -> int:
+        """Return the (default) max supported sample rate."""
         # if a player does not report/set its supported sample rates, we use a pretty safe default
-        return self._attr_default_sample_rates
+        return self._attr_max_sample_rate
 
     @property
-    def default_stream_type(self) -> ContentType:
-        """Return the default content type to use for streaming."""
-        return self._attr_default_stream_type
+    def stream_type(self) -> ContentType:
+        """Return the default/preferred content type to use for streaming."""
+        return self._attr_stream_type
 
     # GROUP PLAYER ATTRIBUTES AND METHODS (may be overridden if needed)
     # a player can optionally be a group leader (e.g. Sonos)
index a81bfeec642717358949a0f0ad2bfb888ced1d32..6e9d0403370de626b16bc6c613d976e4fbbccdcb 100644 (file)
@@ -9,6 +9,7 @@ from dataclasses import dataclass
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
 
 from music_assistant.models.enums import (
+    ContentType,
     EventType,
     MediaType,
     ProviderType,
@@ -17,7 +18,11 @@ from music_assistant.models.enums import (
 )
 from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
 from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import MediaItemType, media_from_dict
+from music_assistant.models.media_items import (
+    MediaItemType,
+    StreamDetails,
+    media_from_dict,
+)
 
 from .player import Player, PlayerState
 from .queue_item import QueueItem
@@ -46,8 +51,8 @@ class QueueSnapShot:
     items: List[QueueItem]
     index: Optional[int]
     position: int
-    repeat: RepeatMode
-    shuffle: bool
+    settings: dict
+    volume_level: int
 
 
 class PlayerQueue:
@@ -70,6 +75,7 @@ class PlayerQueue:
         self._last_player_update: int = 0
         self._last_stream_id: str = ""
         self._snapshot: Optional[QueueSnapShot] = None
+        self._announcement_in_progress: bool = False
 
     async def setup(self) -> None:
         """Handle async setup of instance."""
@@ -105,8 +111,9 @@ class PlayerQueue:
         if not self._stream_id:
             return None
         if stream := self.mass.streams.queue_streams.get(self._stream_id):
-            return stream.index_in_buffer
-        return None
+            if not stream.done.is_set():
+                return stream.index_in_buffer
+        return self.current_index
 
     @property
     def active(self) -> bool:
@@ -198,6 +205,9 @@ class PlayerQueue:
                 QueueOption.ADD -> Append new items at end of the queue
             :param passive: if passive set to true the stream url will not be sent to the player.
         """
+        if self._announcement_in_progress:
+            self.logger.warning("Ignore queue command: An announcement is in progress")
+            return
         # a single item or list of items may be provided
         if not isinstance(media, list):
             media = [media]
@@ -262,76 +272,114 @@ class PlayerQueue:
         elif queue_opt == QueueOption.ADD:
             await self.append(queue_items)
 
-    async def play_alert(
-        self, uri: str, announce: bool = False, gain_correct: int = 6
-    ) -> str:
+    async def play_announcement(self, url: str, prepend_alert: bool = False) -> str:
         """
-        Play given uri as Alert on the queue.
+        Play given uri as Announcement on the queue.
 
-        uri: Uri that should be played as announcement, can be Music Assistant URI or plain url.
-        announce: Prepend the (TTS) alert with a small announce sound.
-        gain_correct: Adjust the gain of the alert sound (in dB).
+        url: URL that should be played as announcement, can only be plain url.
+        prepend_alert: Prepend the (TTS) announcement with an alert bell sound.
         """
-        assert not self._snapshot, "Alert already in progress"
+        if self._announcement_in_progress:
+            self.logger.warning(
+                "Ignore queue command: An announcement is (already) in progress"
+            )
+            return
 
-        # create snapshot
-        await self.snapshot_create()
+        def create_announcement(_url: str):
+            return QueueItem(
+                uri=_url,
+                name="announcement",
+                duration=30,
+                streamdetails=StreamDetails(
+                    provider=ProviderType.URL,
+                    item_id=_url,
+                    content_type=ContentType.try_parse(_url),
+                    media_type=MediaType.ANNOUNCEMENT,
+                    loudness=0,
+                    gain_correct=4,
+                    data=_url,
+                ),
+                media_type=MediaType.ANNOUNCEMENT,
+            )
 
-        queue_items = []
+        try:
+            # create snapshot
+            await self.snapshot_create()
+            # stop player if needed
+            if self.active and self.player.state in (
+                PlayerState.PLAYING,
+                PlayerState.PAUSED,
+            ):
+                await self.stop()
+                await self._wait_for_state((PlayerState.OFF, PlayerState.IDLE))
+                self._announcement_in_progress = True
+
+            # turn on player if needed
+            if not self.player.powered:
+                await self.player.power(True)
+                await self._wait_for_state(PlayerState.IDLE)
+
+            # adjust volume if needed
+            if self._settings.announce_volume_increase:
+                announce_volume = (
+                    self.player.volume_level + self._settings.announce_volume_increase
+                )
+                announce_volume = min(announce_volume, 100)
+                announce_volume = max(announce_volume, 0)
+                await self.player.volume_set(announce_volume)
+
+            # adjust queue settings for announce playback
+            self._settings.from_dict(
+                {
+                    "repeat_mode": "off",
+                    "shuffle_enabled": False,
+                }
+            )
 
-        # prepend annnounce sound if needed
-        if announce:
-            url_prov = self.mass.music.get_provider(ProviderType.URL)
-            media_item = await url_prov.parse_item(ALERT_ANNOUNCE_FILE)
-            queue_item = QueueItem.from_media_item(media_item)
-            queue_items.append(queue_item)
+            queue_items = []
+            # prepend alert sound if needed
+            if prepend_alert:
+                queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE))
 
-        # parse provided uri into a MA MediaItem
-        try:
-            media_item = await self.mass.music.get_item_by_uri(uri)
-            queue_items.append(QueueItem.from_media_item(media_item))
-        except MusicAssistantError as err:
-            # invalid MA uri or item not found error
-            raise MediaNotFoundError(f"Invalid uri: {uri}") from err
+            queue_items.append(create_announcement(url))
 
-        # start queue with alert sound(s)
-        self._items = queue_items
-        stream = await self.queue_stream_start(
-            start_index=0, seek_position=0, fade_in=False, is_alert=True
-        )
-        # execute the play command on the player(s)
-        await self.player.play_url(stream.url)
+            # append silence track. we use that as a reliable way to make sure
+            # there is enough buffer for the player to start quickly
+            # and to detect when we finished playing the alert
+            silence_item = create_announcement(self.mass.streams.get_silence_url())
+            queue_items.append(silence_item)
 
-        # wait for the player to finish playing
-        play_started = asyncio.Event()
-        play_stopped = asyncio.Event()
+            # start queue with announcement sound(s)
+            self._items = queue_items
+            stream = await self.queue_stream_start(start_index=0, seek_position=0)
+            # execute the play command on the player(s)
+            await self.player.play_url(stream.url)
 
-        def handle_event(evt: MassEvent):
-            if self.player.state == PlayerState.PLAYING:
-                play_started.set()
-            elif play_started.is_set():
-                play_stopped.set()
+            # wait for the player to finish playing
+            await asyncio.sleep(5)
+            await self._wait_for_state(PlayerState.PLAYING, silence_item.item_id)
 
-        unsub = self.mass.subscribe(
-            handle_event, EventType.QUEUE_UPDATED, self.queue_id
-        )
-        try:
-            await asyncio.wait_for(play_stopped.wait(), 30)
-        except asyncio.TimeoutError:
-            self.logger.warning("Timeout while playing alert")
+        except Exception as err:  # pylint: disable=broad-except
+            self.logger.exception("Error while playing announcement", exc_info=err)
         finally:
-            unsub()
             # restore queue
+            self._announcement_in_progress = False
             await self.snapshot_restore()
 
     async def stop(self) -> None:
         """Stop command on queue player."""
+        if self._announcement_in_progress:
+            self.logger.warning("Ignore queue command: An announcement is in progress")
+            return
         self.signal_next = False
         # redirect to underlying player
         await self.player.stop()
 
     async def play(self) -> None:
         """Play (unpause) command on queue player."""
+        if self._announcement_in_progress:
+            self.logger.warning("Ignore queue command: An announcement is in progress")
+            return
         if self.active and self.player.state == PlayerState.PAUSED:
             await self.player.play()
         else:
@@ -339,6 +387,9 @@ class PlayerQueue:
 
     async def pause(self) -> None:
         """Pause command on queue player."""
+        if self._announcement_in_progress:
+            self.logger.warning("Ignore queue command: An announcement is in progress")
+            return
         # redirect to underlying player
         await self.player.pause()
 
@@ -391,7 +442,7 @@ class PlayerQueue:
 
         if resume_item is not None:
             resume_pos = resume_pos if resume_pos > 10 else 0
-            fade_in = 5 if resume_pos else 0
+            fade_in = resume_pos > 0
             await self.play_index(resume_item.item_id, resume_pos, fade_in)
         elif len(self._items) > 0:
             # items available in queue but no previous track, start at 0
@@ -410,39 +461,48 @@ class PlayerQueue:
             items=self._items,
             index=self._current_index,
             position=self._current_item_elapsed_time,
-            repeat=self._settings.repeat_mode,
-            shuffle=self._settings.shuffle_enabled,
+            settings=self._settings.to_dict(),
+            volume_level=self.player.volume_level,
         )
 
     async def snapshot_restore(self) -> None:
         """Restore snapshot of Queue state."""
         assert self._snapshot, "Create snapshot before restoring it."
-        # clear queue first
-        await self.clear()
-        # restore queue
-        self._settings.repeat_mode = self._snapshot.repeat
-        self._settings.shuffle_enabled = self._snapshot.shuffle
-        await self._update_items(self._snapshot.items)
-        self._current_index = self._snapshot.index
-        self._current_item_elapsed_time = self._snapshot.position
-        if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
-            await self.resume()
-        if self._snapshot.state == PlayerState.PAUSED:
-            await self.pause()
-        if not self._snapshot.powered:
-            await self.player.power(False)
-        # reset snapshot once restored
-        self.logger.debug("Restored snapshot...")
-        self._snapshot = None
+        try:
+            # clear queue first
+            await self.clear()
+            # restore volume if needed
+            if self._snapshot.volume_level != self.player.volume_level:
+                await self.player.volume_set(self._snapshot.volume_level)
+            # restore queue
+            self._settings.from_dict(self._snapshot.settings)
+            await self.update_items(self._snapshot.items)
+            self._current_index = self._snapshot.index
+            self._current_item_elapsed_time = self._snapshot.position
+            if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+                await self.resume()
+            if self._snapshot.state == PlayerState.PAUSED:
+                await self.pause()
+            if not self._snapshot.powered:
+                await self.player.power(False)
+            # reset snapshot once restored
+            self.logger.debug("Restored snapshot...")
+        except Exception as err:  # pylint: disable=broad-except
+            self.logger.exception("Error while restoring snapshot", exc_info=err)
+        finally:
+            self._snapshot = None
 
     async def play_index(
         self,
         index: Union[int, str],
         seek_position: int = 0,
-        fade_in: int = 0,
+        fade_in: bool = False,
         passive: bool = False,
     ) -> None:
         """Play item at index (or item_id) X in queue."""
+        if self._announcement_in_progress:
+            self.logger.warning("Ignore queue command: An announcement is in progress")
+            return
         if not isinstance(index, int):
             index = self.index_by_id(index)
         if index is None:
@@ -480,7 +540,7 @@ class PlayerQueue:
             return
         # move the item in the list
         items.insert(new_index, items.pop(item_index))
-        await self._update_items(items)
+        await self.update_items(items)
 
     async def delete_item(self, queue_item_id: str) -> None:
         """Delete item (by id or index) from the queue."""
@@ -563,13 +623,13 @@ class PlayerQueue:
             queue_items = played_items + cur_item + next_items
         else:
             queue_items = self._items + queue_items
-        await self._update_items(queue_items)
+        await self.update_items(queue_items)
 
     async def clear(self) -> None:
         """Clear all items in the queue."""
         if self.player.state not in (PlayerState.IDLE, PlayerState.OFF):
             await self.stop()
-        await self._update_items([])
+        await self.update_items([])
 
     def on_player_update(self) -> None:
         """Call when player updates."""
@@ -600,6 +660,8 @@ class PlayerQueue:
         """Update queue details, called when player updates."""
         if self.player.active_queue != self:
             return
+        if not self.active:
+            return
         new_index = self._current_index
         track_time = self._current_item_elapsed_time
         new_item_loaded = False
@@ -636,14 +698,10 @@ class PlayerQueue:
     async def queue_stream_start(
         self,
         start_index: int,
-        seek_position: int,
-        fade_in: bool,
-        is_alert: bool = False,
+        seek_position: int = 0,
+        fade_in: bool = False,
     ) -> QueueStream:
         """Start the queue stream runner."""
-        self._current_item_elapsed_time = 0
-        self._current_index = start_index
-
         # start the queue stream background task
         stream = await self.mass.streams.start_queue_stream(
             queue=self,
@@ -651,21 +709,20 @@ class PlayerQueue:
             seek_position=seek_position,
             fade_in=fade_in,
             output_format=self._settings.stream_type,
-            is_alert=is_alert,
         )
         self._stream_id = stream.stream_id
+        self._current_item_elapsed_time = 0
+        self._current_index = start_index
         return stream
 
     def get_next_index(self, cur_index: Optional[int]) -> int:
         """Return the next index for the queue, accounting for repeat settings."""
-        alert_active = self.stream and self.stream.is_alert
         # handle repeat single track
-        if not alert_active and self.settings.repeat_mode == RepeatMode.ONE:
+        if self.settings.repeat_mode == RepeatMode.ONE:
             return cur_index
         # handle repeat all
         if (
-            not alert_active
-            and self.settings.repeat_mode == RepeatMode.ALL
+            self.settings.repeat_mode == RepeatMode.ALL
             and self._items
             and cur_index == (len(self._items) - 1)
         ):
@@ -710,7 +767,7 @@ class PlayerQueue:
             "settings": self.settings.to_dict(),
         }
 
-    async def _update_items(self, queue_items: List[QueueItem]) -> None:
+    async def update_items(self, queue_items: List[QueueItem]) -> None:
         """Update the existing queue items, mostly caused by reordering."""
         self._items = queue_items
         self.signal_update(True)
@@ -778,3 +835,28 @@ class PlayerQueue:
                 "current_item_elapsed_time": self._current_item_elapsed_time,
             },
         )
+
+    async def _wait_for_state(
+        self,
+        state: Union[None, PlayerState, Tuple[PlayerState]],
+        queue_item_id: Optional[str] = None,
+        timeout: int = 30,
+    ) -> None:
+        """Wait for player(queue) to reach a specific state."""
+        if state is not None and not isinstance(state, tuple):
+            state = (state,)
+
+        count = 0
+        while count < timeout * 10:
+
+            if (state is None or self.player.state in state) and (
+                queue_item_id is None
+                or self.current_item
+                and self.current_item.item_id == queue_item_id
+            ):
+                return
+
+            count += 1
+            await asyncio.sleep(0.1)
+
+        raise TimeoutError(f"Timeout while waiting on state(s) {state}")
index f88cf8feca8256d382fa87e125b6a40fa7e1caa8..02e94f48e57f3bfeb94a4503e975ba424355606d 100644 (file)
@@ -3,7 +3,7 @@ from __future__ import annotations
 
 import asyncio
 import random
-from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Optional
 
 from .enums import ContentType, CrossFadeMode, RepeatMode
 
@@ -24,8 +24,9 @@ class QueueSettings:
         self._crossfade_duration: int = 6
         self._volume_normalization_enabled: bool = True
         self._volume_normalization_target: int = -14
-        self._stream_type: Optional[ContentType] = None
-        self._sample_rates: Optional[Tuple[int]] = None
+        self._stream_type: ContentType = queue.player.stream_type
+        self._max_sample_rate: int = queue.player.max_sample_rate
+        self._announce_volume_increase: int = 15
 
     @property
     def repeat_mode(self) -> RepeatMode:
@@ -50,24 +51,29 @@ class QueueSettings:
         if not self._shuffle_enabled and enabled:
             # shuffle requested
             self._shuffle_enabled = True
-            if self._queue.current_index is not None:
-                played_items = self._queue.items[: self._queue.current_index]
-                next_items = self._queue.items[self._queue.current_index + 1 :]
+            cur_index = self._queue.index_in_buffer
+            cur_item = self._queue.get_item(cur_index)
+            if cur_item is not None:
+                played_items = self._queue.items[:cur_index]
+                next_items = self._queue.items[cur_index + 1 :]
                 # for now we use default python random function
                 # can be extended with some more magic based on last_played and stuff
                 next_items = random.sample(next_items, len(next_items))
-                items = played_items + [self._queue.current_item] + next_items
-                asyncio.create_task(self._queue.load(items))
+
+                items = played_items + [cur_item] + next_items
+                asyncio.create_task(self._queue.update_items(items))
                 self._on_update("shuffle_enabled")
         elif self._shuffle_enabled and not enabled:
             # unshuffle
             self._shuffle_enabled = False
-            if self._queue.current_index is not None:
-                played_items = self._queue.items[: self._queue.current_index]
-                next_items = self._queue.items[self._queue.current_index + 1 :]
+            cur_index = self._queue.index_in_buffer
+            cur_item = self._queue.get_item(cur_index)
+            if cur_item is not None:
+                played_items = self._queue.items[:cur_index]
+                next_items = self._queue.items[cur_index + 1 :]
                 next_items.sort(key=lambda x: x.sort_index, reverse=False)
-                items = played_items + [self._queue.current_item] + next_items
-                asyncio.create_task(self._queue.load(items))
+                items = played_items + [cur_item] + next_items
+                asyncio.create_task(self._queue.update_items(items))
                 self._on_update("shuffle_enabled")
 
     @property
@@ -126,9 +132,6 @@ class QueueSettings:
     @property
     def stream_type(self) -> ContentType:
         """Return supported/preferred stream type for this playerqueue."""
-        if self._stream_type is None:
-            # return player's default
-            return self._queue.player.default_stream_type
         return self._stream_type
 
     @stream_type.setter
@@ -139,24 +142,28 @@ class QueueSettings:
             self._on_update("stream_type")
 
     @property
-    def sample_rates(self) -> Tuple[int]:
-        """Return supported/preferred sample rate(s) for this playerqueue."""
-        if self._sample_rates is None:
-            # return player's default
-            return self._queue.player.default_sample_rates
-        return self._sample_rates
-
-    @sample_rates.setter
-    def sample_rates(self, value: ContentType) -> None:
+    def max_sample_rate(self) -> int:
+        """Return max supported/needed sample rate(s) for this playerqueue."""
+        return self._max_sample_rate
+
+    @max_sample_rate.setter
+    def max_sample_rate(self, value: ContentType) -> None:
         """Set supported/preferred sample rate(s) for this playerqueue."""
-        if self._stream_type != value:
-            self._stream_type = value
-            self._on_update("sample_rates")
+        if self._max_sample_rate != value:
+            self._max_sample_rate = value
+            self._on_update("max_sample_rate")
 
     @property
-    def max_sample_rate(self) -> int:
-        """Return the maximum samplerate supported by this playerqueue."""
-        return max(self.sample_rates)
+    def announce_volume_increase(self) -> int:
+        """Return announce_volume_increase setting (percentage relative to current)."""
+        return self._announce_volume_increase
+
+    @announce_volume_increase.setter
+    def announce_volume_increase(self, volume_increase: int) -> None:
+        """Set announce_volume_increase setting."""
+        if self._announce_volume_increase != volume_increase:
+            self._announce_volume_increase = volume_increase
+            self._on_update("announce_volume_increase")
 
     def to_dict(self) -> Dict[str, Any]:
         """Return dict from settings."""
@@ -168,31 +175,45 @@ class QueueSettings:
             "volume_normalization_enabled": self.volume_normalization_enabled,
             "volume_normalization_target": self.volume_normalization_target,
             "stream_type": self.stream_type.value,
-            "sample_rates": self.sample_rates,
+            "max_sample_rate": self.max_sample_rate,
+            "announce_volume_increase": self.announce_volume_increase,
         }
 
+    def from_dict(self, d: Dict[str, Any]) -> None:
+        """Initialize settings from dict."""
+        self._repeat_mode = RepeatMode(d.get("repeat_mode", self._repeat_mode.value))
+        self._shuffle_enabled = bool(d.get("shuffle_enabled", self._shuffle_enabled))
+        self._crossfade_mode = CrossFadeMode(
+            d.get("crossfade_mode", self._crossfade_mode.value)
+        )
+        self._crossfade_duration = int(
+            d.get("crossfade_duration", self._crossfade_duration)
+        )
+        self._volume_normalization_enabled = bool(
+            d.get("volume_normalization_enabled", self._volume_normalization_enabled)
+        )
+        self._volume_normalization_target = float(
+            d.get("volume_normalization_target", self._volume_normalization_target)
+        )
+        self._stream_type = ContentType(d.get("stream_type", self._stream_type.value))
+        self._max_sample_rate = int(d.get("max_sample_rate", self._max_sample_rate))
+        self._announce_volume_increase = int(
+            d.get("announce_volume_increase", self._announce_volume_increase)
+        )
+
     async def restore(self) -> None:
         """Restore state from db."""
-        for key, val_type in (
-            ("repeat_mode", RepeatMode),
-            ("crossfade_mode", CrossFadeMode),
-            ("shuffle_enabled", bool),
-            ("crossfade_duration", int),
-            ("volume_normalization_enabled", bool),
-            ("volume_normalization_target", float),
-            ("stream_type", ContentType),
-            ("sample_rates", tuple),
-        ):
+        values = {}
+        for key in self.to_dict():
             db_key = f"{self._queue.queue_id}_{key}"
             if db_value := await self.mass.database.get_setting(db_key):
-                value = val_type(db_value["value"])
-                setattr(self, f"_{key}", value)
+                values[key] = db_value
+        self.from_dict(values)
 
     def _on_update(self, changed_key: Optional[str] = None) -> None:
         """Handle state changed."""
         self._queue.signal_update()
         self.mass.create_task(self.save(changed_key))
-        # TODO: restart play if setting changed that impacts playing queue
 
     async def save(self, changed_key: Optional[str] = None) -> None:
         """Save state in db."""