Add play alert feature (#356)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 2 Jun 2022 22:41:51 +0000 (00:41 +0200)
committerGitHub <noreply@github.com>
Thu, 2 Jun 2022 22:41:51 +0000 (00:41 +0200)
* add play alert feature

* create constant for fallback duration

* improvements and fix buffers

* move seek to top

* fade in at resume

music_assistant/controllers/stream.py
music_assistant/helpers/audio.py
music_assistant/helpers/process.py
music_assistant/helpers/resources/announce.flac [new file with mode: 0644]
music_assistant/models/player_queue.py
music_assistant/models/queue_item.py

index 246dddb59174376b416692ece0bf4074c0e5dbdd..12031eca3a0db9a0d78ec1e3626202145c72ed62 100644 (file)
@@ -13,6 +13,7 @@ from music_assistant.helpers.audio import (
     check_audio_support,
     create_wave_header,
     crossfade_pcm_parts,
+    fadein_pcm_part,
     get_media_stream,
     get_preview_stream,
     get_sox_args_for_pcm_stream,
@@ -73,11 +74,16 @@ class StreamController:
         enc_track_id = urllib.parse.quote(track_id)
         return f"http://{self._ip}:{self._port}/preview?provider_id={provider.value}&item_id={enc_track_id}"
 
+    def get_silence_url(self, duration: int = 600) -> str:
+        """Return url to silence."""
+        return f"http://{self._ip}:{self._port}/silence?duration={duration}"
+
     async def setup(self) -> None:
         """Async initialize of module."""
         app = web.Application()
 
         app.router.add_get("/preview", self.serve_preview)
+        app.router.add_get("/silence", self.serve_silence)
         app.router.add_get(
             "/{queue_id}/{player_id}.{format}",
             self.serve_multi_client_queue_stream,
@@ -120,6 +126,19 @@ class StreamController:
 
         self.logger.info("Started stream server on port %s", self._port)
 
+    @staticmethod
+    async def serve_silence(request: web.Request):
+        """Serve silence."""
+        resp = web.StreamResponse(
+            status=200, reason="OK", headers={"Content-Type": "audio/wav"}
+        )
+        await resp.prepare(request)
+        duration = int(request.query.get("duration", 600))
+        await resp.write(create_wave_header(duration=duration))
+        for _ in range(0, duration):
+            await resp.write(b"\0" * 1764000)
+        return resp
+
     async def serve_preview(self, request: web.Request):
         """Serve short preview sample."""
         provider_id = request.query["provider_id"]
@@ -148,9 +167,7 @@ class StreamController:
             # send stop here to prevent the player from retrying over and over
             await queue.stop()
             # send some silence to allow the player to process the stop request
-            result = create_wave_header(duration=10)
-            result += b"\0" * 1764000
-            return web.Response(status=200, body=result, content_type="audio/wav")
+            return await self.serve_silence(request)
 
         resp = web.StreamResponse(
             status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"}
@@ -414,6 +431,7 @@ class StreamController:
         resample: bool = False,
     ) -> AsyncGenerator[None, bytes]:
         """Stream the PlayerQueue's tracks as constant feed of PCM raw audio."""
+        bytes_written_total = 0
         last_fadeout_data = b""
         queue_index = None
         track_count = 0
@@ -429,15 +447,15 @@ class StreamController:
 
         # stream queue tracks one by one
         while True:
-            start_timestamp = time()
             # get the (next) track in queue
             track_count += 1
             if track_count == 1:
                 # report start of queue playback so we can calculate current track/duration etc.
-                queue_index, seek_position = await queue.queue_stream_start()
+                queue_index, seek_position, fade_in = await queue.queue_stream_start()
             else:
                 queue_index = await queue.queue_stream_next(queue_index)
                 seek_position = 0
+                fade_in = 0
             queue_track = queue.get_item(queue_index)
             if not queue_track:
                 self.logger.debug(
@@ -524,19 +542,25 @@ class StreamController:
 
                 # HANDLE FIRST PART OF TRACK
                 if not chunk and bytes_written == 0 and is_last_chunk:
-                    # stream error: got empy first chunk
+                    # stream error: got empy first chunk ?!
                     self.logger.warning("Stream error on %s", queue_track.uri)
-                    # prevent player queue get stuck by just skipping to the next track
-                    queue_track.duration = 0
-                    continue
-                if cur_chunk <= 2 and not last_fadeout_data:
+                elif cur_chunk == 1 and last_fadeout_data:
+                    prev_chunk = chunk
+                    del chunk
+                elif cur_chunk == 1 and fade_in:
+                    # fadein first chunk
+                    fadein_first_part = await fadein_pcm_part(
+                        chunk, fade_in, pcm_fmt, sample_rate
+                    )
+                    yield fadein_first_part
+                    bytes_written += len(fadein_first_part)
+                    del chunk
+                    del fadein_first_part
+                elif cur_chunk <= 2 and not last_fadeout_data:
                     # no fadeout_part available so just pass it to the output directly
                     yield chunk
                     bytes_written += len(chunk)
                     del chunk
-                elif cur_chunk == 1 and last_fadeout_data:
-                    prev_chunk = chunk
-                    del chunk
                 # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN
                 elif cur_chunk == 2 and last_fadeout_data:
                     # combine the first 2 chunks and strip off silence
@@ -617,15 +641,16 @@ class StreamController:
                     else:
                         prev_chunk = chunk
                     del chunk
-                # allow clients to only buffer max ~15 seconds ahead
-                seconds_streamed = bytes_written / sample_size
-                seconds_needed = int(time() - start_timestamp) + 15
-                diff = seconds_streamed - seconds_needed
-                if diff:
+                # allow clients to only buffer max ~10 seconds ahead
+                queue_track.streamdetails.seconds_played = bytes_written / sample_size
+                seconds_buffered = (bytes_written_total + bytes_written) / sample_size
+                seconds_needed = queue.player.elapsed_time + 10
+                diff = seconds_buffered - seconds_needed
+                track_time = queue_track.duration or 0
+                if track_time > 10 and diff > 1:
                     await asyncio.sleep(diff)
             # end of the track reached
-            # set actual duration to the queue for more accurate now playing info
-            queue_track.streamdetails.seconds_played = bytes_written / sample_size
+            bytes_written_total += bytes_written
             self.logger.debug(
                 "Finished Streaming queue track: %s (%s) on queue %s",
                 queue_track.uri,
index c17cb86ee2464ddbaf9eb0bc4742cb17ee48ec1a..d12d5afd9f3ea57df6287bf0c6a2e8ec3968eed3 100644 (file)
@@ -93,6 +93,34 @@ async def crossfade_pcm_parts(
     return crossfade_part
 
 
+async def fadein_pcm_part(
+    pcm_audio: bytes,
+    fade_length: int,
+    fmt: ContentType,
+    sample_rate: int,
+) -> bytes:
+    """Fadein chunk of pcm/raw audio using ffmpeg."""
+    # input args
+    args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
+    args += [
+        "-f",
+        fmt.value,
+        "-ac",
+        "2",
+        "-ar",
+        str(sample_rate),
+        "-i",
+        "-",
+    ]
+    # filter args
+    args += ["-af", f"afade=type=in:start_time=0:duration={fade_length}"]
+    # output args
+    args += ["-f", fmt.value, "-"]
+    async with AsyncProcess(args, True) as proc:
+        result_audio, _ = await proc.communicate(pcm_audio)
+        return result_audio
+
+
 async def strip_silence(
     audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False
 ) -> bytes:
@@ -372,6 +400,8 @@ async def get_sox_args(
                 "-i",
                 stream_path,
             ]
+        if seek_position:
+            input_args += ["-ss", str(seek_position)]
         # collect output args
         if output_format.is_pcm():
             output_args = [
@@ -389,8 +419,6 @@ async def get_sox_args(
             filter_args += ["-filter:a", f"volume={streamdetails.gain_correct}dB"]
         if resample or input_format.is_pcm():
             filter_args += ["-ar", str(resample)]
-        if seek_position:
-            filter_args += ["-ss", str(seek_position)]
         return input_args + filter_args + output_args
 
     # Prefer SoX for all other (=highest quality)
@@ -442,6 +470,17 @@ async def get_media_stream(
 ) -> AsyncGenerator[Tuple[bool, bytes], None]:
     """Get the audio stream for the given streamdetails."""
 
+    if chunk_size is None:
+        if streamdetails.content_type in (
+            ContentType.AAC,
+            ContentType.M4A,
+            ContentType.MP3,
+            ContentType.OGG,
+        ):
+            chunk_size = 32000
+        else:
+            chunk_size = 256000
+
     mass.signal_event(
         MassEvent(
             EventType.STREAM_STARTED,
@@ -486,7 +525,10 @@ async def get_media_stream(
                 streamdetails.item_id, streamdetails.provider
             )
             # send analyze job to background worker
-            if streamdetails.loudness is None and streamdetails.provider != "url":
+            if (
+                streamdetails.loudness is None
+                and streamdetails.provider != ProviderType.URL
+            ):
                 uri = f"{streamdetails.provider.value}://{streamdetails.media_type.value}/{streamdetails.item_id}"
                 mass.add_job(
                     analyze_audio(mass, streamdetails), f"Analyze audio for {uri}"
index c790b4ca2896d1f4bb8f3005c846f0c540281a9e..958e64b1d7ec1b4a6e7742ae541e644e3560a982 100644 (file)
@@ -14,7 +14,7 @@ from async_timeout import timeout as _timeout
 
 LOGGER = logging.getLogger("AsyncProcess")
 
-DEFAULT_CHUNKSIZE = 176400  # 1 second of pcm 44100/16
+DEFAULT_CHUNKSIZE = 512000
 DEFAULT_TIMEOUT = 120
 
 
diff --git a/music_assistant/helpers/resources/announce.flac b/music_assistant/helpers/resources/announce.flac
new file mode 100644 (file)
index 0000000..95c7cae
Binary files /dev/null and b/music_assistant/helpers/resources/announce.flac differ
index 064e368a571e422faaf73e16e5f59c9591a1945f..90f8d6b4ac04dc45c336a80cc4905635b9879d08 100644 (file)
@@ -2,8 +2,10 @@
 from __future__ import annotations
 
 import asyncio
+import pathlib
 import random
 from asyncio import Task, TimerHandle
+from dataclasses import dataclass
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
 
 from music_assistant.helpers.audio import get_stream_details
@@ -23,6 +25,27 @@ from .queue_settings import QueueSettings
 if TYPE_CHECKING:
     from music_assistant.mass import MusicAssistant
 
+RESOURCES_DIR = (
+    pathlib.Path(__file__)
+    .parent.resolve()
+    .parent.resolve()
+    .joinpath("helpers/resources")
+)
+ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
+FALLBACK_DURATION = 172800  # if duration is None (e.g. radio stream) = 48 hours
+
+
+@dataclass
+class QueueSnapShot:
+    """Represent a snapshot of the queue and its settings."""
+
+    powered: bool
+    state: PlayerState
+    volume_level: int
+    items: List[QueueItem]
+    index: Optional[int]
+    position: int
+
 
 class PlayerQueue:
     """Represents a PlayerQueue object."""
@@ -42,6 +65,7 @@ class PlayerQueue:
         self._start_index: int = 0
         # start_pos: from which position (in seconds) did the first track start playing?
         self._start_pos: int = 0
+        self._next_fadein: int = 0
         self._next_start_index: int = 0
         self._next_start_pos: int = 0
         self._last_state = PlayerState.IDLE
@@ -51,6 +75,7 @@ class PlayerQueue:
         self._signal_next: bool = False
         self._last_player_update: int = 0
         self._stream_url: str = ""
+        self._snapshot: Optional[QueueSnapShot] = None
 
     async def setup(self) -> None:
         """Handle async setup of instance."""
@@ -161,7 +186,6 @@ class PlayerQueue:
         """
         Play media item(s) on the given queue.
 
-            :param queue_id: queue id of the PlayerQueue to handle the command.
             :param uri: uri(s) that should be played (single item or list of uri's).
             :param queue_opt:
                 QueueOption.PLAY -> Insert new items in queue and start playing at inserted position
@@ -241,6 +265,75 @@ class PlayerQueue:
             await self.append(queue_items)
         return self._stream_url
 
+    async def play_alert(
+        self, uri: str, announce: bool = False, volume_adjust: int = 10
+    ) -> str:
+        """
+        Play given uri as Alert on the queue.
+
+        uri: Uri that should be played as announcement, can be Music Assistant URI or plain url.
+        announce: Prepend the (TTS) alert with a small announce sound.
+        volume_adjust: Adjust the volume of the player by this percentage (relative).
+        """
+        if self._snapshot:
+            self.logger.debug("Ignore play_alert: already in progress")
+            return
+
+        # create snapshot
+        await self.snapshot_create()
+
+        queue_items = []
+
+        # prepend annnounce sound if needed
+        if announce:
+            queue_items.append(QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert"))
+
+        # parse provided uri into a MA MediaItem or Basic QueueItem from URL
+        try:
+            media_item = await self.mass.music.get_item_by_uri(uri)
+            queue_items.append(QueueItem.from_media_item(media_item))
+        except MusicAssistantError as err:
+            # invalid MA uri or item not found error
+            if uri.startswith("http"):
+                # a plain url was provided
+                queue_items.append(QueueItem.from_url(uri, "alert"))
+            else:
+                raise MediaNotFoundError(f"Invalid uri: {uri}") from err
+
+        # append silence track, we use this to reliably detect when the alert is ready
+        silence_url = self.mass.streams.get_silence_url(600)
+        queue_items.append(QueueItem.from_url(silence_url, "alert"))
+
+        # load queue with alert sound(s)
+        await self.load(queue_items)
+        # set new volume
+        new_volume = self.player.volume_level + (
+            self.player.volume_level / 100 * volume_adjust
+        )
+        await self.player.volume_set(new_volume)
+
+        # wait for the alert to finish playing
+        alert_done = asyncio.Event()
+
+        def handle_event(evt: MassEvent):
+            if (
+                self.current_item
+                and self.current_item.uri == silence_url
+                and self.elapsed_time
+            ):
+                alert_done.set()
+
+        unsub = self.mass.subscribe(
+            handle_event, EventType.QUEUE_TIME_UPDATED, self.queue_id
+        )
+        try:
+            await asyncio.wait_for(alert_done.wait(), 120)
+        finally:
+
+            unsub()
+            # restore queue
+            await self.snapshot_restore()
+
     async def stop(self) -> None:
         """Stop command on queue player."""
         # redirect to underlying player
@@ -296,20 +389,58 @@ class PlayerQueue:
         """Resume previous queue."""
         resume_item = self.current_item
         resume_pos = self._current_item_elapsed_time
-        if resume_item and resume_pos > (resume_item.duration * 0.8):
-            # track is already played for > 80% - skip to next
+        if (
+            resume_item
+            and resume_item.duration
+            and resume_pos > (resume_item.duration * 0.9)
+        ):
+            # track is already played for > 90% - skip to next
             resume_item = self.next_item
             resume_pos = 0
 
         if resume_item is not None:
-            await self.play_index(resume_item.item_id, resume_pos)
+            await self.play_index(resume_item.item_id, resume_pos, 5)
         else:
             self.logger.warning(
                 "resume queue requested for %s but queue is empty", self.queue_id
             )
 
+    async def snapshot_create(self) -> None:
+        """Create snapshot of current Queue state."""
+        self._snapshot = QueueSnapShot(
+            powered=self.player.powered,
+            state=self.player.state,
+            volume_level=self.player.volume_level,
+            items=self._items,
+            index=self._current_index,
+            position=self._current_item_elapsed_time,
+        )
+
+    async def snapshot_restore(self) -> None:
+        """Restore snapshot of Queue state."""
+        assert self._snapshot, "Create snapshot before restoring it."
+        # clear queue first
+        await self.clear()
+        # restore volume
+        await self.player.volume_set(self._snapshot.volume_level)
+        # restore queue
+        await self.update(self._snapshot.items)
+        self._current_index = self._snapshot.index
+        if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+            await self.resume()
+        if self._snapshot.state == PlayerState.PAUSED:
+            await self.pause()
+        if not self._snapshot.powered:
+            await self.player.power(False)
+        # reset snapshot once restored
+        self._snapshot = None
+
     async def play_index(
-        self, index: Union[int, str], seek_position: int = 0, passive: bool = False
+        self,
+        index: Union[int, str],
+        seek_position: int = 0,
+        fade_in: int = 0,
+        passive: bool = False,
     ) -> None:
         """Play item at index (or item_id) X in queue."""
         if self.player.use_multi_stream:
@@ -323,6 +454,7 @@ class PlayerQueue:
         self._current_index = index
         self._next_start_index = index
         self._next_start_pos = int(seek_position)
+        self._next_fadein = fade_in
         # send stream url to player connected to this queue
         self._stream_url = self.mass.streams.get_stream_url(
             self.queue_id, content_type=self._settings.stream_type
@@ -550,7 +682,7 @@ class PlayerQueue:
             await self.play_index(self._current_index + 2)
             raise err
 
-    async def queue_stream_start(self) -> Tuple[int, int]:
+    async def queue_stream_start(self) -> Tuple[int, int, int]:
         """Call when queue_streamer starts playing the queue stream."""
         start_from_index = self._next_start_index
         self._current_item_elapsed_time = 0
@@ -560,7 +692,9 @@ class PlayerQueue:
         self._index_in_buffer = start_from_index
         seek_position = self._next_start_pos
         self._next_start_pos = 0
-        return (start_from_index, seek_position)
+        fade_in = self._next_fadein
+        self._next_fadein = 0
+        return (start_from_index, seek_position, fade_in)
 
     async def queue_stream_next(self, cur_index: int) -> int | None:
         """Call when queue_streamer loads next track in buffer."""
@@ -633,13 +767,11 @@ class PlayerQueue:
                 if not queue_track.streamdetails:
                     track_time = elapsed_time_queue - total_time
                     break
-                track_duration = (
-                    queue_track.streamdetails.seconds_played or queue_track.duration
-                )
-                if elapsed_time_queue > (track_duration + total_time):
+                track_seconds = queue_track.streamdetails.seconds_played
+                if elapsed_time_queue > (track_seconds + total_time):
                     # total elapsed time is more than (streamed) track duration
                     # move index one up
-                    total_time += track_duration
+                    total_time += track_seconds
                     queue_index += 1
                 else:
                     # no more seconds left to divide, this is our track
@@ -655,6 +787,9 @@ class PlayerQueue:
             try:
                 self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]]
                 self._current_index = queue_cache["current_index"]
+                self._current_item_elapsed_time = queue_cache.get(
+                    "current_item_elapsed_time", 0
+                )
             except (KeyError, AttributeError, TypeError) as err:
                 self.logger.warning(
                     "Unable to restore queue state for queue %s",
@@ -670,5 +805,6 @@ class PlayerQueue:
             {
                 "items": [x.to_dict() for x in self._items],
                 "current_index": self._current_index,
+                "current_item_elapsed_time": self._current_item_elapsed_time,
             },
         )
index b5dba9cf671b77d6b2ca0a59c5544bc8954fc8b8..61c5ded7f4edbe3743bf31cca984d14f9061cc56 100644 (file)
@@ -46,12 +46,11 @@ class QueueItem(DataClassDictMixin):
         return d
 
     @classmethod
-    def from_url(cls, url: str) -> QueueItem:
+    def from_url(cls, url: str, name: Optional[str] = None) -> QueueItem:
         """Create QueueItem from plain url."""
         return cls(
             uri=url,
-            name=url,
-            duration=None,
+            name=name or url,
             media_type=MediaType.UNKNOWN,
             image=None,
             media_item=None,