From: Marcel van der Veldt Date: Thu, 2 Jun 2022 22:41:51 +0000 (+0200) Subject: Add play alert feature (#356) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=96c1f82fb1d77d480ae4e0323945a41da7e5bf29;p=music-assistant-server.git Add play alert feature (#356) * add play alert feature * create constant for fallback duration * improvements and fix buffers * move seek to top * fade in at resume --- diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index 246dddb5..12031eca 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -13,6 +13,7 @@ from music_assistant.helpers.audio import ( check_audio_support, create_wave_header, crossfade_pcm_parts, + fadein_pcm_part, get_media_stream, get_preview_stream, get_sox_args_for_pcm_stream, @@ -73,11 +74,16 @@ class StreamController: enc_track_id = urllib.parse.quote(track_id) return f"http://{self._ip}:{self._port}/preview?provider_id={provider.value}&item_id={enc_track_id}" + def get_silence_url(self, duration: int = 600) -> str: + """Return url to silence.""" + return f"http://{self._ip}:{self._port}/silence?duration={duration}" + async def setup(self) -> None: """Async initialize of module.""" app = web.Application() app.router.add_get("/preview", self.serve_preview) + app.router.add_get("/silence", self.serve_silence) app.router.add_get( "/{queue_id}/{player_id}.{format}", self.serve_multi_client_queue_stream, @@ -120,6 +126,19 @@ class StreamController: self.logger.info("Started stream server on port %s", self._port) + @staticmethod + async def serve_silence(request: web.Request): + """Serve silence.""" + resp = web.StreamResponse( + status=200, reason="OK", headers={"Content-Type": "audio/wav"} + ) + await resp.prepare(request) + duration = int(request.query.get("duration", 600)) + await resp.write(create_wave_header(duration=duration)) + for _ in range(0, duration): + await resp.write(b"\0" * 1764000) + return resp + async def serve_preview(self, request: web.Request): """Serve short preview sample.""" provider_id = request.query["provider_id"] @@ -148,9 +167,7 @@ class StreamController: # send stop here to prevent the player from retrying over and over await queue.stop() # send some silence to allow the player to process the stop request - result = create_wave_header(duration=10) - result += b"\0" * 1764000 - return web.Response(status=200, body=result, content_type="audio/wav") + return await self.serve_silence(request) resp = web.StreamResponse( status=200, reason="OK", headers={"Content-Type": f"audio/{fmt}"} @@ -414,6 +431,7 @@ class StreamController: resample: bool = False, ) -> AsyncGenerator[None, bytes]: """Stream the PlayerQueue's tracks as constant feed of PCM raw audio.""" + bytes_written_total = 0 last_fadeout_data = b"" queue_index = None track_count = 0 @@ -429,15 +447,15 @@ class StreamController: # stream queue tracks one by one while True: - start_timestamp = time() # get the (next) track in queue track_count += 1 if track_count == 1: # report start of queue playback so we can calculate current track/duration etc. - queue_index, seek_position = await queue.queue_stream_start() + queue_index, seek_position, fade_in = await queue.queue_stream_start() else: queue_index = await queue.queue_stream_next(queue_index) seek_position = 0 + fade_in = 0 queue_track = queue.get_item(queue_index) if not queue_track: self.logger.debug( @@ -524,19 +542,25 @@ class StreamController: # HANDLE FIRST PART OF TRACK if not chunk and bytes_written == 0 and is_last_chunk: - # stream error: got empy first chunk + # stream error: got empy first chunk ?! self.logger.warning("Stream error on %s", queue_track.uri) - # prevent player queue get stuck by just skipping to the next track - queue_track.duration = 0 - continue - if cur_chunk <= 2 and not last_fadeout_data: + elif cur_chunk == 1 and last_fadeout_data: + prev_chunk = chunk + del chunk + elif cur_chunk == 1 and fade_in: + # fadein first chunk + fadein_first_part = await fadein_pcm_part( + chunk, fade_in, pcm_fmt, sample_rate + ) + yield fadein_first_part + bytes_written += len(fadein_first_part) + del chunk + del fadein_first_part + elif cur_chunk <= 2 and not last_fadeout_data: # no fadeout_part available so just pass it to the output directly yield chunk bytes_written += len(chunk) del chunk - elif cur_chunk == 1 and last_fadeout_data: - prev_chunk = chunk - del chunk # HANDLE CROSSFADE OF PREVIOUS TRACK FADE_OUT AND THIS TRACK FADE_IN elif cur_chunk == 2 and last_fadeout_data: # combine the first 2 chunks and strip off silence @@ -617,15 +641,16 @@ class StreamController: else: prev_chunk = chunk del chunk - # allow clients to only buffer max ~15 seconds ahead - seconds_streamed = bytes_written / sample_size - seconds_needed = int(time() - start_timestamp) + 15 - diff = seconds_streamed - seconds_needed - if diff: + # allow clients to only buffer max ~10 seconds ahead + queue_track.streamdetails.seconds_played = bytes_written / sample_size + seconds_buffered = (bytes_written_total + bytes_written) / sample_size + seconds_needed = queue.player.elapsed_time + 10 + diff = seconds_buffered - seconds_needed + track_time = queue_track.duration or 0 + if track_time > 10 and diff > 1: await asyncio.sleep(diff) # end of the track reached - # set actual duration to the queue for more accurate now playing info - queue_track.streamdetails.seconds_played = bytes_written / sample_size + bytes_written_total += bytes_written self.logger.debug( "Finished Streaming queue track: %s (%s) on queue %s", queue_track.uri, diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index c17cb86e..d12d5afd 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -93,6 +93,34 @@ async def crossfade_pcm_parts( return crossfade_part +async def fadein_pcm_part( + pcm_audio: bytes, + fade_length: int, + fmt: ContentType, + sample_rate: int, +) -> bytes: + """Fadein chunk of pcm/raw audio using ffmpeg.""" + # input args + args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] + args += [ + "-f", + fmt.value, + "-ac", + "2", + "-ar", + str(sample_rate), + "-i", + "-", + ] + # filter args + args += ["-af", f"afade=type=in:start_time=0:duration={fade_length}"] + # output args + args += ["-f", fmt.value, "-"] + async with AsyncProcess(args, True) as proc: + result_audio, _ = await proc.communicate(pcm_audio) + return result_audio + + async def strip_silence( audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False ) -> bytes: @@ -372,6 +400,8 @@ async def get_sox_args( "-i", stream_path, ] + if seek_position: + input_args += ["-ss", str(seek_position)] # collect output args if output_format.is_pcm(): output_args = [ @@ -389,8 +419,6 @@ async def get_sox_args( filter_args += ["-filter:a", f"volume={streamdetails.gain_correct}dB"] if resample or input_format.is_pcm(): filter_args += ["-ar", str(resample)] - if seek_position: - filter_args += ["-ss", str(seek_position)] return input_args + filter_args + output_args # Prefer SoX for all other (=highest quality) @@ -442,6 +470,17 @@ async def get_media_stream( ) -> AsyncGenerator[Tuple[bool, bytes], None]: """Get the audio stream for the given streamdetails.""" + if chunk_size is None: + if streamdetails.content_type in ( + ContentType.AAC, + ContentType.M4A, + ContentType.MP3, + ContentType.OGG, + ): + chunk_size = 32000 + else: + chunk_size = 256000 + mass.signal_event( MassEvent( EventType.STREAM_STARTED, @@ -486,7 +525,10 @@ async def get_media_stream( streamdetails.item_id, streamdetails.provider ) # send analyze job to background worker - if streamdetails.loudness is None and streamdetails.provider != "url": + if ( + streamdetails.loudness is None + and streamdetails.provider != ProviderType.URL + ): uri = f"{streamdetails.provider.value}://{streamdetails.media_type.value}/{streamdetails.item_id}" mass.add_job( analyze_audio(mass, streamdetails), f"Analyze audio for {uri}" diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index c790b4ca..958e64b1 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -14,7 +14,7 @@ from async_timeout import timeout as _timeout LOGGER = logging.getLogger("AsyncProcess") -DEFAULT_CHUNKSIZE = 176400 # 1 second of pcm 44100/16 +DEFAULT_CHUNKSIZE = 512000 DEFAULT_TIMEOUT = 120 diff --git a/music_assistant/helpers/resources/announce.flac b/music_assistant/helpers/resources/announce.flac new file mode 100644 index 00000000..95c7caec Binary files /dev/null and b/music_assistant/helpers/resources/announce.flac differ diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 064e368a..90f8d6b4 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -2,8 +2,10 @@ from __future__ import annotations import asyncio +import pathlib import random from asyncio import Task, TimerHandle +from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from music_assistant.helpers.audio import get_stream_details @@ -23,6 +25,27 @@ from .queue_settings import QueueSettings if TYPE_CHECKING: from music_assistant.mass import MusicAssistant +RESOURCES_DIR = ( + pathlib.Path(__file__) + .parent.resolve() + .parent.resolve() + .joinpath("helpers/resources") +) +ALERT_ANNOUNCE_FILE = str(RESOURCES_DIR.joinpath("announce.flac")) +FALLBACK_DURATION = 172800 # if duration is None (e.g. radio stream) = 48 hours + + +@dataclass +class QueueSnapShot: + """Represent a snapshot of the queue and its settings.""" + + powered: bool + state: PlayerState + volume_level: int + items: List[QueueItem] + index: Optional[int] + position: int + class PlayerQueue: """Represents a PlayerQueue object.""" @@ -42,6 +65,7 @@ class PlayerQueue: self._start_index: int = 0 # start_pos: from which position (in seconds) did the first track start playing? self._start_pos: int = 0 + self._next_fadein: int = 0 self._next_start_index: int = 0 self._next_start_pos: int = 0 self._last_state = PlayerState.IDLE @@ -51,6 +75,7 @@ class PlayerQueue: self._signal_next: bool = False self._last_player_update: int = 0 self._stream_url: str = "" + self._snapshot: Optional[QueueSnapShot] = None async def setup(self) -> None: """Handle async setup of instance.""" @@ -161,7 +186,6 @@ class PlayerQueue: """ Play media item(s) on the given queue. - :param queue_id: queue id of the PlayerQueue to handle the command. :param uri: uri(s) that should be played (single item or list of uri's). :param queue_opt: QueueOption.PLAY -> Insert new items in queue and start playing at inserted position @@ -241,6 +265,75 @@ class PlayerQueue: await self.append(queue_items) return self._stream_url + async def play_alert( + self, uri: str, announce: bool = False, volume_adjust: int = 10 + ) -> str: + """ + Play given uri as Alert on the queue. + + uri: Uri that should be played as announcement, can be Music Assistant URI or plain url. + announce: Prepend the (TTS) alert with a small announce sound. + volume_adjust: Adjust the volume of the player by this percentage (relative). + """ + if self._snapshot: + self.logger.debug("Ignore play_alert: already in progress") + return + + # create snapshot + await self.snapshot_create() + + queue_items = [] + + # prepend annnounce sound if needed + if announce: + queue_items.append(QueueItem.from_url(ALERT_ANNOUNCE_FILE, "alert")) + + # parse provided uri into a MA MediaItem or Basic QueueItem from URL + try: + media_item = await self.mass.music.get_item_by_uri(uri) + queue_items.append(QueueItem.from_media_item(media_item)) + except MusicAssistantError as err: + # invalid MA uri or item not found error + if uri.startswith("http"): + # a plain url was provided + queue_items.append(QueueItem.from_url(uri, "alert")) + else: + raise MediaNotFoundError(f"Invalid uri: {uri}") from err + + # append silence track, we use this to reliably detect when the alert is ready + silence_url = self.mass.streams.get_silence_url(600) + queue_items.append(QueueItem.from_url(silence_url, "alert")) + + # load queue with alert sound(s) + await self.load(queue_items) + # set new volume + new_volume = self.player.volume_level + ( + self.player.volume_level / 100 * volume_adjust + ) + await self.player.volume_set(new_volume) + + # wait for the alert to finish playing + alert_done = asyncio.Event() + + def handle_event(evt: MassEvent): + if ( + self.current_item + and self.current_item.uri == silence_url + and self.elapsed_time + ): + alert_done.set() + + unsub = self.mass.subscribe( + handle_event, EventType.QUEUE_TIME_UPDATED, self.queue_id + ) + try: + await asyncio.wait_for(alert_done.wait(), 120) + finally: + + unsub() + # restore queue + await self.snapshot_restore() + async def stop(self) -> None: """Stop command on queue player.""" # redirect to underlying player @@ -296,20 +389,58 @@ class PlayerQueue: """Resume previous queue.""" resume_item = self.current_item resume_pos = self._current_item_elapsed_time - if resume_item and resume_pos > (resume_item.duration * 0.8): - # track is already played for > 80% - skip to next + if ( + resume_item + and resume_item.duration + and resume_pos > (resume_item.duration * 0.9) + ): + # track is already played for > 90% - skip to next resume_item = self.next_item resume_pos = 0 if resume_item is not None: - await self.play_index(resume_item.item_id, resume_pos) + await self.play_index(resume_item.item_id, resume_pos, 5) else: self.logger.warning( "resume queue requested for %s but queue is empty", self.queue_id ) + async def snapshot_create(self) -> None: + """Create snapshot of current Queue state.""" + self._snapshot = QueueSnapShot( + powered=self.player.powered, + state=self.player.state, + volume_level=self.player.volume_level, + items=self._items, + index=self._current_index, + position=self._current_item_elapsed_time, + ) + + async def snapshot_restore(self) -> None: + """Restore snapshot of Queue state.""" + assert self._snapshot, "Create snapshot before restoring it." + # clear queue first + await self.clear() + # restore volume + await self.player.volume_set(self._snapshot.volume_level) + # restore queue + await self.update(self._snapshot.items) + self._current_index = self._snapshot.index + if self._snapshot.state in (PlayerState.PLAYING, PlayerState.PAUSED): + await self.resume() + if self._snapshot.state == PlayerState.PAUSED: + await self.pause() + if not self._snapshot.powered: + await self.player.power(False) + # reset snapshot once restored + self._snapshot = None + async def play_index( - self, index: Union[int, str], seek_position: int = 0, passive: bool = False + self, + index: Union[int, str], + seek_position: int = 0, + fade_in: int = 0, + passive: bool = False, ) -> None: """Play item at index (or item_id) X in queue.""" if self.player.use_multi_stream: @@ -323,6 +454,7 @@ class PlayerQueue: self._current_index = index self._next_start_index = index self._next_start_pos = int(seek_position) + self._next_fadein = fade_in # send stream url to player connected to this queue self._stream_url = self.mass.streams.get_stream_url( self.queue_id, content_type=self._settings.stream_type @@ -550,7 +682,7 @@ class PlayerQueue: await self.play_index(self._current_index + 2) raise err - async def queue_stream_start(self) -> Tuple[int, int]: + async def queue_stream_start(self) -> Tuple[int, int, int]: """Call when queue_streamer starts playing the queue stream.""" start_from_index = self._next_start_index self._current_item_elapsed_time = 0 @@ -560,7 +692,9 @@ class PlayerQueue: self._index_in_buffer = start_from_index seek_position = self._next_start_pos self._next_start_pos = 0 - return (start_from_index, seek_position) + fade_in = self._next_fadein + self._next_fadein = 0 + return (start_from_index, seek_position, fade_in) async def queue_stream_next(self, cur_index: int) -> int | None: """Call when queue_streamer loads next track in buffer.""" @@ -633,13 +767,11 @@ class PlayerQueue: if not queue_track.streamdetails: track_time = elapsed_time_queue - total_time break - track_duration = ( - queue_track.streamdetails.seconds_played or queue_track.duration - ) - if elapsed_time_queue > (track_duration + total_time): + track_seconds = queue_track.streamdetails.seconds_played + if elapsed_time_queue > (track_seconds + total_time): # total elapsed time is more than (streamed) track duration # move index one up - total_time += track_duration + total_time += track_seconds queue_index += 1 else: # no more seconds left to divide, this is our track @@ -655,6 +787,9 @@ class PlayerQueue: try: self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]] self._current_index = queue_cache["current_index"] + self._current_item_elapsed_time = queue_cache.get( + "current_item_elapsed_time", 0 + ) except (KeyError, AttributeError, TypeError) as err: self.logger.warning( "Unable to restore queue state for queue %s", @@ -670,5 +805,6 @@ class PlayerQueue: { "items": [x.to_dict() for x in self._items], "current_index": self._current_index, + "current_item_elapsed_time": self._current_item_elapsed_time, }, ) diff --git a/music_assistant/models/queue_item.py b/music_assistant/models/queue_item.py index b5dba9cf..61c5ded7 100644 --- a/music_assistant/models/queue_item.py +++ b/music_assistant/models/queue_item.py @@ -46,12 +46,11 @@ class QueueItem(DataClassDictMixin): return d @classmethod - def from_url(cls, url: str) -> QueueItem: + def from_url(cls, url: str, name: Optional[str] = None) -> QueueItem: """Create QueueItem from plain url.""" return cls( uri=url, - name=url, - duration=None, + name=name or url, media_type=MediaType.UNKNOWN, image=None, media_item=None,