From 5741dd434478c1cfc3e40b60bc52823c35244a93 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 27 Jul 2022 17:25:39 +0200 Subject: [PATCH] Fix announce feature (#441) * Fix announce feature --- music_assistant/controllers/streams.py | 72 +++++++++++--------- music_assistant/models/enums.py | 1 - music_assistant/models/player_queue.py | 93 +++++++------------------- 3 files changed, 67 insertions(+), 99 deletions(-) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index b660610c..cccbd847 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -6,7 +6,7 @@ import gc import urllib.parse from time import time from types import CoroutineType -from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional +from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Tuple from uuid import uuid4 from aiohttp import web @@ -17,7 +17,6 @@ from music_assistant.helpers.audio import ( get_chunksize, get_media_stream, get_preview_stream, - get_silence, get_stream_details, strip_silence, ) @@ -51,6 +50,7 @@ class StreamsController: self._port = mass.config.stream_port self._ip = mass.config.stream_ip self.queue_streams: Dict[str, QueueStream] = {} + self.announcements: Dict[str, Tuple[str]] = {} @property def base_url(self) -> str: @@ -66,6 +66,17 @@ class StreamsController: ext = content_type.value return f"{self.base_url}/{stream_id}.{ext}" + def get_announcement_url( + self, + queue_id: str, + urls: Tuple[str], + content_type: ContentType, + ) -> str: + """Start running a queue stream.""" + self.announcements[queue_id] = urls + ext = content_type.value + return f"{self.base_url}/announce/{queue_id}.{ext}" + async def get_preview_url(self, provider: ProviderType, track_id: str) -> str: """Return url to short preview sample.""" track = await self.mass.music.tracks.get_provider_item(track_id, provider) @@ -74,20 +85,12 @@ class StreamsController: enc_track_id = urllib.parse.quote(track_id) return f"{self.base_url}/preview?provider_id={provider.value}&item_id={enc_track_id}" - def get_silence_url( - self, - content_type: ContentType = ContentType.WAV, - ) -> str: - """Generate stream url for a silence Stream.""" - ext = content_type.value - return f"{self.base_url}/silence.{ext}" - async def setup(self) -> None: """Async initialize of module.""" app = web.Application() app.router.add_get("/preview", self.serve_preview) - app.router.add_get("/silence.{fmt}", self.serve_silence) + app.router.add_get("/announce/{queue_id}.{fmt}", self.serve_announcement) app.router.add_get("/{stream_id}.{fmt}", self.serve_queue_stream) runner = web.AppRunner(app, access_log=None) @@ -131,20 +134,26 @@ class StreamsController: await resp.write(chunk) return resp - @staticmethod - async def serve_silence(request: web.Request): - """Serve some nice silence.""" - duration = int(request.query.get("duration", 60)) + async def serve_announcement(self, request: web.Request): + """Serve announcement broadcast.""" + queue_id = request.match_info["queue_id"] fmt = ContentType.try_parse(request.match_info["fmt"]) + urls = self.announcements[queue_id] - resp = web.StreamResponse( - status=200, reason="OK", headers={"Content-Type": f"audio/{fmt.value}"} - ) - await resp.prepare(request) - if request.method == "GET": - async for chunk in get_silence(duration, fmt): - await resp.write(chunk) - return resp + ffmpeg_args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"] + for url in urls: + ffmpeg_args += ["-i", url] + if len(urls) > 1: + ffmpeg_args += [ + "-filter_complex", + f"[0:a][1:a]concat=n={len(urls)}:v=0:a=1", + ] + ffmpeg_args += ["-f", fmt.value, "-"] + + async with AsyncProcess(ffmpeg_args) as ffmpeg_proc: + output, _ = await ffmpeg_proc.communicate() + + return web.Response(body=output, headers={"Content-Type": f"audio/{fmt.value}"}) async def serve_queue_stream(self, request: web.Request): """Serve queue audio stream to a single player.""" @@ -245,12 +254,7 @@ class StreamsController: streamdetails = await get_stream_details(self.mass, first_item, queue.queue_id) # work out pcm details - if streamdetails.media_type == MediaType.ANNOUNCEMENT: - pcm_sample_rate = 44100 - pcm_bit_depth = 16 - pcm_channels = 2 - allow_resample = True - elif queue.settings.crossfade_mode == CrossFadeMode.ALWAYS: + if queue.settings.crossfade_mode == CrossFadeMode.ALWAYS: pcm_sample_rate = min(96000, queue.settings.max_sample_rate) pcm_bit_depth = 24 pcm_channels = 2 @@ -557,7 +561,6 @@ class QueueStream: use_crossfade = ( self.queue.settings.crossfade_mode != CrossFadeMode.DISABLED and self.queue.settings.crossfade_duration > 0 - and streamdetails.media_type != MediaType.ANNOUNCEMENT ) # do not crossfade tracks of same album if ( @@ -632,6 +635,15 @@ class QueueStream: else: buffer_duration = 2 + # use dynamic buffer size to account for slow connections (or throttling providers, like YT) + # buffer_duration has some overhead to account for padded silence + if use_crossfade and buffered_ahead > (crossfade_duration * 4): + buffer_duration = crossfade_duration + 6 + elif use_crossfade and buffered_ahead > (crossfade_duration * 2): + buffer_duration = crossfade_duration + 4 + else: + buffer_duration = 2 + #### HANDLE FIRST PART OF TRACK if len(chunk) == 0 and bytes_written == 0: diff --git a/music_assistant/models/enums.py b/music_assistant/models/enums.py index 272b9193..feaa1eef 100644 --- a/music_assistant/models/enums.py +++ b/music_assistant/models/enums.py @@ -13,7 +13,6 @@ class MediaType(Enum): PLAYLIST = "playlist" RADIO = "radio" FOLDER = "folder" - ANNOUNCEMENT = "announcement" UNKNOWN = "unknown" @classmethod diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 8449119a..47d0f7d9 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -2,29 +2,18 @@ from __future__ import annotations import asyncio -import os import pathlib import random from asyncio import TimerHandle from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union +from music_assistant.helpers.tags import parse_tags from music_assistant.helpers.util import try_parse_int -from music_assistant.models.enums import ( - ContentType, - EventType, - MediaType, - ProviderType, - QueueOption, - RepeatMode, -) +from music_assistant.models.enums import EventType, MediaType, QueueOption, RepeatMode from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError from music_assistant.models.event import MassEvent -from music_assistant.models.media_items import ( - MediaItemType, - StreamDetails, - media_from_dict, -) +from music_assistant.models.media_items import MediaItemType, media_from_dict from .player import Player, PlayerState from .queue_item import QueueItem @@ -41,9 +30,7 @@ RESOURCES_DIR = ( .joinpath("helpers/resources") ) -ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac")) -if not os.path.isfile(ALERT_ANNOUNCE_FILE): - ALERT_ANNOUNCE_FILE = None +ANNOUNCE_ALERT_FILE = str(RESOURCES_DIR.joinpath("announce.flac")) FALLBACK_DURATION = 172800 # if duration is None (e.g. radio stream) = 48 hours @@ -291,37 +278,15 @@ class PlayerQueue: ) return - def create_announcement(_url: str): - return QueueItem( - name="announcement", - duration=30, - streamdetails=StreamDetails( - provider=ProviderType.URL, - item_id=_url, - content_type=ContentType.try_parse(_url), - media_type=MediaType.ANNOUNCEMENT, - loudness=0, - gain_correct=4, - direct=_url, - ), - ) - try: # create snapshot await self.snapshot_create() + wait_time = 2 # stop player if needed - if self.active and self.player.state in ( - PlayerState.PLAYING, - PlayerState.PAUSED, - ): + if self.active and self.player.state == PlayerState.PLAYING: await self.stop() self.announcement_in_progress = True - await self._wait_for_state((PlayerState.OFF, PlayerState.IDLE)) - - # turn on player if needed - if not self.player.powered: - await self.player.power(True) - await self._wait_for_state(PlayerState.IDLE) + await asyncio.sleep(0.1) # adjust volume if needed if self._settings.announce_volume_increase: @@ -330,37 +295,29 @@ class PlayerQueue: ) announce_volume = min(announce_volume, 100) announce_volume = max(announce_volume, 0) + # turn on player if needed (might be needed before adjusting the volume) + if not self.player.powered: + await self.player.power(True) + wait_time += 2 await self.player.volume_set(announce_volume) - # adjust queue settings for announce playback - self._settings.from_dict( - { - "repeat_mode": "off", - "shuffle_enabled": False, - } - ) - - queue_items = [] # prepend alert sound if needed - if prepend_alert and ALERT_ANNOUNCE_FILE: - queue_items.append(create_announcement(ALERT_ANNOUNCE_FILE)) - - queue_items.append(create_announcement(url)) - - # append silence track. we use that as a reliable way to make sure - # there is enough buffer for the player to start quickly - # and to detect when we finished playing the alert - silence_item = create_announcement(self.mass.streams.get_silence_url()) - queue_items.append(silence_item) + if prepend_alert: + announce_urls = (ANNOUNCE_ALERT_FILE, url) + wait_time += 2 + else: + announce_urls = (url,) - # start queue with announcement sound(s) - self._items = queue_items - stream = await self.queue_stream_start(start_index=0, seek_position=0) - # execute the play command on the player(s) - await self.player.play_url(stream.url) + # send announcement stream to player + announce_stream_url = self.mass.streams.get_announcement_url( + self.queue_id, announce_urls, self._settings.stream_type + ) + await self.player.play_url(announce_stream_url) # wait for the player to finish playing - await self._wait_for_state(PlayerState.PLAYING, silence_item.item_id) + info = await parse_tags(url) + wait_time += info.duration or 10 + await asyncio.sleep(wait_time) except Exception as err: # pylint: disable=broad-except self.logger.exception("Error while playing announcement", exc_info=err) @@ -893,7 +850,7 @@ class PlayerQueue: self, state: Union[None, PlayerState, Tuple[PlayerState]], queue_item_id: Optional[str] = None, - timeout: int = 30, + timeout: int = 120, ) -> None: """Wait for player(queue) to reach a specific state.""" if state is not None and not isinstance(state, tuple): -- 2.34.1