Fix announce feature (#441)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 27 Jul 2022 15:25:39 +0000 (17:25 +0200)
committerGitHub <noreply@github.com>
Wed, 27 Jul 2022 15:25:39 +0000 (17:25 +0200)
* Fix announce feature

music_assistant/controllers/streams.py
music_assistant/models/enums.py
music_assistant/models/player_queue.py

index b660610ce2e53f0750215756a58b34e9cae8dd6d..cccbd847f1a776ee91f0918bbc122166a6457145 100644 (file)
@@ -6,7 +6,7 @@ import gc
 import urllib.parse
 from time import time
 from types import CoroutineType
-from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional
+from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Tuple
 from uuid import uuid4
 
 from aiohttp import web
@@ -17,7 +17,6 @@ from music_assistant.helpers.audio import (
     get_chunksize,
     get_media_stream,
     get_preview_stream,
-    get_silence,
     get_stream_details,
     strip_silence,
 )
@@ -51,6 +50,7 @@ class StreamsController:
         self._port = mass.config.stream_port
         self._ip = mass.config.stream_ip
         self.queue_streams: Dict[str, QueueStream] = {}
+        self.announcements: Dict[str, Tuple[str]] = {}
 
     @property
     def base_url(self) -> str:
@@ -66,6 +66,17 @@ class StreamsController:
         ext = content_type.value
         return f"{self.base_url}/{stream_id}.{ext}"
 
+    def get_announcement_url(
+        self,
+        queue_id: str,
+        urls: Tuple[str],
+        content_type: ContentType,
+    ) -> str:
+        """Start running a queue stream."""
+        self.announcements[queue_id] = urls
+        ext = content_type.value
+        return f"{self.base_url}/announce/{queue_id}.{ext}"
+
     async def get_preview_url(self, provider: ProviderType, track_id: str) -> str:
         """Return url to short preview sample."""
         track = await self.mass.music.tracks.get_provider_item(track_id, provider)
@@ -74,20 +85,12 @@ class StreamsController:
         enc_track_id = urllib.parse.quote(track_id)
         return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}"
 
-    def get_silence_url(
-        self,
-        content_type: ContentType = ContentType.WAV,
-    ) -> str:
-        """Generate stream url for a silence Stream."""
-        ext = content_type.value
-        return f"{self.base_url}/silence.{ext}"
-
     async def setup(self) -> None:
         """Async initialize of module."""
         app = web.Application()
 
         app.router.add_get("/preview", self.serve_preview)
-        app.router.add_get("/silence.{fmt}", self.serve_silence)
+        app.router.add_get("/announce/{queue_id}.{fmt}", self.serve_announcement)
         app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream)
 
         runner = web.AppRunner(app, access_log=None)
@@ -131,20 +134,26 @@ class StreamsController:
             await resp.write(chunk)
         return resp
 
-    @staticmethod
-    async def serve_silence(request: web.Request):
-        """Serve some nice silence."""
-        duration = int(request.query.get("duration", 60))
+    async def serve_announcement(self, request: web.Request):
+        """Serve announcement broadcast."""
+        queue_id = request.match_info["queue_id"]
         fmt = ContentType.try_parse(request.match_info["fmt"])
+        urls = self.announcements[queue_id]
 
-        resp = web.StreamResponse(
-            status=200, reason="OK", headers={"Content-Type": f"audio/{fmt.value}"}
-        )
-        await resp.prepare(request)
-        if request.method == "GET":
-            async for chunk in get_silence(duration, fmt):
-                await resp.write(chunk)
-        return resp
+        ffmpeg_args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
+        for url in urls:
+            ffmpeg_args += ["-i", url]
+        if len(urls) > 1:
+            ffmpeg_args += [
+                "-filter_complex",
+                f"[0:a][1:a]concat=n={len(urls)}:v=0:a=1",
+            ]
+        ffmpeg_args += ["-f", fmt.value, "-"]
+
+        async with AsyncProcess(ffmpeg_args) as ffmpeg_proc:
+            output, _ = await ffmpeg_proc.communicate()
+
+        return web.Response(body=output, headers={"Content-Type": f"audio/{fmt.value}"})
 
     async def serve_queue_stream(self, request: web.Request):
         """Serve queue audio stream to a single player."""
@@ -245,12 +254,7 @@ class StreamsController:
         streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id)
 
         # work out pcm details
-        if streamdetails.media_type == MediaType.ANNOUNCEMENT:
-            pcm_sample_rate = 44100
-            pcm_bit_depth = 16
-            pcm_channels = 2
-            allow_resample = True
-        elif queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
+        if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
             pcm_sample_rate = min(96000, queue.settings.max_sample_rate)
             pcm_bit_depth = 24
             pcm_channels = 2
@@ -557,7 +561,6 @@ class QueueStream:
             use_crossfade = (
                 self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED
                 and self.queue.settings.crossfade_duration > 0
-                and streamdetails.media_type != MediaType.ANNOUNCEMENT
             )
             # do not crossfade tracks of same album
             if (
@@ -632,6 +635,15 @@ class QueueStream:
                 else:
                     buffer_duration = 2
 
+                # use dynamic buffer size to account for slow connections (or throttling providers, like YT)
+                # buffer_duration has some overhead to account for padded silence
+                if use_crossfade and buffered_ahead > (crossfade_duration * 4):
+                    buffer_duration = crossfade_duration + 6
+                elif use_crossfade and buffered_ahead > (crossfade_duration * 2):
+                    buffer_duration = crossfade_duration + 4
+                else:
+                    buffer_duration = 2
+
                 ####  HANDLE FIRST PART OF TRACK
 
                 if len(chunk) == 0 and bytes_written == 0:
index 272b919347baed476ef454756f0b8fbae2775c9d..feaa1eefbdc5e473ff7fb27092160921f46da928 100644 (file)
@@ -13,7 +13,6 @@ class MediaType(Enum):
     PLAYLIST = "playlist"
     RADIO = "radio"
     FOLDER = "folder"
-    ANNOUNCEMENT = "announcement"
     UNKNOWN = "unknown"
 
     @classmethod
index 8449119a8c9bf2f8525f2842e44e70763575965a..47d0f7d97520182ae27209c5b7ea84127ccd9b4b 100644 (file)
@@ -2,29 +2,18 @@
 from __future__ import annotations
 
 import asyncio
-import os
 import pathlib
 import random
 from asyncio import TimerHandle
 from dataclasses import dataclass
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
 
+from music_assistant.helpers.tags import parse_tags
 from music_assistant.helpers.util import try_parse_int
-from music_assistant.models.enums import (
-    ContentType,
-    EventType,
-    MediaType,
-    ProviderType,
-    QueueOption,
-    RepeatMode,
-)
+from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode
 from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
 from music_assistant.models.event import MassEvent
-from music_assistant.models.media_items import (
-    MediaItemType,
-    StreamDetails,
-    media_from_dict,
-)
+from music_assistant.models.media_items import MediaItemType, media_from_dict
 
 from .player import Player, PlayerState
 from .queue_item import QueueItem
@@ -41,9 +30,7 @@ RESOURCES_DIR = (
     .joinpath("helpers/resources")
 )
 
-ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
-if not os.path.isfile(ALERT_ANNOUNCE_FILE):
-    ALERT_ANNOUNCE_FILE = None
+ANNOUNCE_ALERT_FILE = str(RESOURCES_DIR.joinpath("announce.flac"))
 
 FALLBACK_DURATION = 172800  # if duration is None (e.g. radio stream) = 48 hours
 
@@ -291,37 +278,15 @@ class PlayerQueue:
             )
             return
 
-        def create_announcement(_url: str):
-            return QueueItem(
-                name="announcement",
-                duration=30,
-                streamdetails=StreamDetails(
-                    provider=ProviderType.URL,
-                    item_id=_url,
-                    content_type=ContentType.try_parse(_url),
-                    media_type=MediaType.ANNOUNCEMENT,
-                    loudness=0,
-                    gain_correct=4,
-                    direct=_url,
-                ),
-            )
-
         try:
             # create snapshot
             await self.snapshot_create()
+            wait_time = 2
             # stop player if needed
-            if self.active and self.player.state in (
-                PlayerState.PLAYING,
-                PlayerState.PAUSED,
-            ):
+            if self.active and self.player.state == PlayerState.PLAYING:
                 await self.stop()
                 self.announcement_in_progress = True
-                await self._wait_for_state((PlayerState.OFF, PlayerState.IDLE))
-
-            # turn on player if needed
-            if not self.player.powered:
-                await self.player.power(True)
-                await self._wait_for_state(PlayerState.IDLE)
+                await asyncio.sleep(0.1)
 
             # adjust volume if needed
             if self._settings.announce_volume_increase:
@@ -330,37 +295,29 @@ class PlayerQueue:
                 )
                 announce_volume = min(announce_volume, 100)
                 announce_volume = max(announce_volume, 0)
+                # turn on player if needed (might be needed before adjusting the volume)
+                if not self.player.powered:
+                    await self.player.power(True)
+                    wait_time += 2
                 await self.player.volume_set(announce_volume)
 
-            # adjust queue settings for announce playback
-            self._settings.from_dict(
-                {
-                    "repeat_mode": "off",
-                    "shuffle_enabled": False,
-                }
-            )
-
-            queue_items = []
             # prepend alert sound if needed
-            if prepend_alert and ALERT_ANNOUNCE_FILE:
-                queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE))
-
-            queue_items.append(create_announcement(url))
-
-            # append silence track. we use that as a reliable way to make sure
-            # there is enough buffer for the player to start quickly
-            # and to detect when we finished playing the alert
-            silence_item = create_announcement(self.mass.streams.get_silence_url())
-            queue_items.append(silence_item)
+            if prepend_alert:
+                announce_urls = (ANNOUNCE_ALERT_FILE, url)
+                wait_time += 2
+            else:
+                announce_urls = (url,)
 
-            # start queue with announcement sound(s)
-            self._items = queue_items
-            stream = await self.queue_stream_start(start_index=0, seek_position=0)
-            # execute the play command on the player(s)
-            await self.player.play_url(stream.url)
+            # send announcement stream to player
+            announce_stream_url = self.mass.streams.get_announcement_url(
+                self.queue_id, announce_urls, self._settings.stream_type
+            )
+            await self.player.play_url(announce_stream_url)
 
             # wait for the player to finish playing
-            await self._wait_for_state(PlayerState.PLAYING, silence_item.item_id)
+            info = await parse_tags(url)
+            wait_time += info.duration or 10
+            await asyncio.sleep(wait_time)
 
         except Exception as err:  # pylint: disable=broad-except
             self.logger.exception("Error while playing announcement", exc_info=err)
@@ -893,7 +850,7 @@ class PlayerQueue:
         self,
         state: Union[None, PlayerState, Tuple[PlayerState]],
         queue_item_id: Optional[str] = None,
-        timeout: int = 30,
+        timeout: int = 120,
     ) -> None:
         """Wait for player(queue) to reach a specific state."""
         if state is not None and not isinstance(state, tuple):