From: Marcel van der Veldt Date: Sun, 22 Feb 2026 01:47:07 +0000 (+0100) Subject: Add a bunch of extra error handling and logging for flow streams X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=2bf925238eedd53e2553ac10fa109b430bdc415f;p=music-assistant-server.git Add a bunch of extra error handling and logging for flow streams --- diff --git a/music_assistant/controllers/streams/streams_controller.py b/music_assistant/controllers/streams/streams_controller.py index ecdb25cf..6b439e0a 100644 --- a/music_assistant/controllers/streams/streams_controller.py +++ b/music_assistant/controllers/streams/streams_controller.py @@ -1006,10 +1006,13 @@ class StreamsController(CoreController): break if queue_track.streamdetails is None: - raise InvalidDataError( - "No Streamdetails known for queue item %s", + self.logger.error( + "No StreamDetails for queue item %s (%s) on queue %s - skipping track", queue_track.queue_item_id, + queue_track.name, + queue.display_name, ) + continue self.logger.debug( "Start Streaming queue track: %s (%s) for queue %s", @@ -1086,30 +1089,45 @@ class StreamsController(CoreController): fadein_part = buffer[:crossfade_buffer_size] remaining_bytes = buffer[crossfade_buffer_size:] # Use the mixer to handle all crossfade logic - crossfade_part = await self._smart_fades_mixer.mix( - fade_in_part=fadein_part, - fade_out_part=last_fadeout_part, - fade_in_streamdetails=queue_track.streamdetails, - fade_out_streamdetails=last_streamdetails, - pcm_format=pcm_format, - standard_crossfade_duration=standard_crossfade_duration, - mode=smart_fades_mode, - ) - # because the crossfade exists of both the fadein and fadeout part - # we need to correct the bytes_written accordingly so the duration - # calculations at the end of the track are correct - crossfade_part_len = len(crossfade_part) - bytes_written += int(crossfade_part_len / 2) - if last_play_log_entry: - assert last_play_log_entry.seconds_streamed is not None - last_play_log_entry.seconds_streamed += ( - crossfade_part_len / 2 / pcm_sample_size + try: + crossfade_part = await self._smart_fades_mixer.mix( + fade_in_part=fadein_part, + fade_out_part=last_fadeout_part, + fade_in_streamdetails=queue_track.streamdetails, + fade_out_streamdetails=last_streamdetails, + pcm_format=pcm_format, + standard_crossfade_duration=standard_crossfade_duration, + mode=smart_fades_mode, ) - # yield crossfade_part (in pcm_sample_size chunks) - for _chunk in divide_chunks(crossfade_part, pcm_sample_size): - yield _chunk - del _chunk - del crossfade_part + except Exception as mix_err: + self.logger.warning( + "Crossfade mixer failed for %s, falling back to simple concat: %s", + queue_track.name, + mix_err, + ) + # Fallback: just output the fadeout part then the buffer + for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size): + yield _chunk + bytes_written += len(_chunk) + del _chunk + crossfade_part = b"" + remaining_bytes = buffer + if crossfade_part: + # because the crossfade exists of both the fadein and fadeout part + # we need to correct the bytes_written accordingly so the duration + # calculations at the end of the track are correct + crossfade_part_len = len(crossfade_part) + bytes_written += int(crossfade_part_len / 2) + if last_play_log_entry: + assert last_play_log_entry.seconds_streamed is not None + last_play_log_entry.seconds_streamed += ( + crossfade_part_len / 2 / pcm_sample_size + ) + # yield crossfade_part (in pcm_sample_size chunks) + for _chunk in divide_chunks(crossfade_part, pcm_sample_size): + yield _chunk + del _chunk + del crossfade_part # also write the leftover bytes from the crossfade action if remaining_bytes: yield remaining_bytes @@ -1127,6 +1145,18 @@ class StreamsController(CoreController): buffer = buffer[pcm_sample_size:] #### HANDLE END OF TRACK + if not first_chunk_received: + # Track failed to stream - no chunks received at all + self.logger.warning( + "Track %s (%s) on queue %s produced no audio data - skipping", + queue_track.name, + queue_track.streamdetails.uri if queue_track.streamdetails else "unknown", + queue.display_name, + ) + # Clean up and continue to next track + queue_track.streamdetails.stream_error = True + del buffer + continue if last_fadeout_part: # edge case: we did not get enough data to make the crossfade for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size): @@ -1459,6 +1489,21 @@ class StreamsController(CoreController): streamdetails.uri, err, ) + except asyncio.CancelledError: + # Don't swallow cancellation - let it propagate + raise + except Exception as err: + # Catch any other unexpected exceptions to prevent them from + # silently killing the entire queue stream + streamdetails.stream_error = True + if raise_on_error: + raise + self.logger.exception( + "Unexpected error while streaming queue item %s (%s): %s", + queue_item.name, + streamdetails.uri, + err, + ) finally: # determine how many seconds we've streamed # for pcm output we can calculate this easily diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index e2897762..f886e576 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -109,10 +109,17 @@ class FFMpeg(AsyncProcess): self._stdin_feeder_task.cancel() # Always await the task to consume any exception and prevent # "Task exception was never retrieved" errors. - # Suppress CancelledError (from cancel) and any other exception - # since exceptions have already been propagated through the generator chain. - with suppress(asyncio.CancelledError, Exception): + try: await self._stdin_feeder_task + except asyncio.CancelledError: + pass # Expected when we cancel the task + except Exception as err: + # Log unexpected exceptions from the stdin feeder before suppressing + # The audio source may have failed, and we need visibility into this + self.logger.warning( + "FFMpeg stdin feeder task ended with error: %s", + err, + ) if self._stderr_reader_task: if not self._stderr_reader_task.done(): self._stderr_reader_task.cancel() diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 090df1ee..a1287efa 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -260,10 +260,16 @@ class AsyncProcess: self._stdin_feeder_task.cancel() # Always await the task to consume any exception and prevent # "Task exception was never retrieved" errors. - # Suppress CancelledError (from cancel) and any other exception - # since exceptions have already been propagated through the generator chain. - with suppress(asyncio.CancelledError, Exception): + try: await self._stdin_feeder_task + except asyncio.CancelledError: + pass # Expected when we cancel the task + except Exception as err: + # Log unexpected exceptions from the stdin feeder before suppressing + LOGGER.warning( + "Process stdin feeder task ended with error: %s", + err, + ) # close stdin to signal we're done sending data with suppress(TimeoutError, asyncio.CancelledError): diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index 3ec7c9d1..edda8cba 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -216,6 +216,7 @@ class AirPlayStreamSession: """Stream audio to all players.""" pcm_sample_size = self.pcm_format.pcm_sample_size watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size)) + stream_error: BaseException | None = None try: async for chunk in audio_source: if not self._first_chunk_received.is_set(): @@ -232,11 +233,26 @@ class AirPlayStreamSession: self.prov.logger.debug("No running clients remaining, stopping audio streamer") break self.seconds_streamed += len(chunk) / pcm_sample_size + except asyncio.CancelledError: + self.prov.logger.debug("Audio streamer cancelled after %.1fs", self.seconds_streamed) + raise + except Exception as err: + stream_error = err + self.prov.logger.error( + "Audio source error after %.1fs of streaming: %s", + self.seconds_streamed, + err, + exc_info=err, + ) finally: if not watchdog_task.done(): watchdog_task.cancel() with suppress(asyncio.CancelledError): await watchdog_task + if stream_error: + self.prov.logger.warning( + "Stream ended prematurely due to error - notifying players" + ) async with self._lock: await asyncio.gather( *[ diff --git a/music_assistant/providers/snapcast/ma_stream.py b/music_assistant/providers/snapcast/ma_stream.py index 8856328b..15462c6f 100644 --- a/music_assistant/providers/snapcast/ma_stream.py +++ b/music_assistant/providers/snapcast/ma_stream.py @@ -325,31 +325,39 @@ class SnapcastMAStream: for t in pending: t.cancel() await asyncio.gather(*pending, return_exceptions=True) - + except asyncio.CancelledError: + self._logger.debug("Snapcast stream %s cancelled", self.stream_name) + raise + except Exception as err: + self._logger.error("Snapcast stream %s error: %s", self.stream_name, err, exc_info=err) + raise finally: self._is_streaming = False self._logger.debug("Finished streaming to %s", stream_path) - # Wait a bit for snap stream to become idle - try: - - async def wait_until_idle() -> None: - while True: - stream_is_idle = False - with suppress(KeyError): - snap_stream = self._provider._snapserver.stream(self.stream_name) - stream_is_idle = snap_stream.status == "idle" - if self._mass.closing or stream_is_idle: - break - await asyncio.sleep(0.25) - - await asyncio.wait_for(wait_until_idle(), timeout=10.0) - except TimeoutError: - self._logger.warning( - "Timeout waiting for stream %s to become idle", - self.stream_name, - ) - finally: - self._streaming_started_at = None + await self._wait_stream_idle() + + async def _wait_stream_idle(self) -> None: + """Wait for the Snapcast stream to become idle after streaming ends.""" + try: + + async def wait_until_idle() -> None: + while True: + stream_is_idle = False + with suppress(KeyError): + snap_stream = self._provider._snapserver.stream(self.stream_name) + stream_is_idle = snap_stream.status == "idle" + if self._mass.closing or stream_is_idle: + break + await asyncio.sleep(0.25) + + await asyncio.wait_for(wait_until_idle(), timeout=10.0) + except TimeoutError: + self._logger.warning( + "Timeout waiting for stream %s to become idle", + self.stream_name, + ) + finally: + self._streaming_started_at = None def _on_streamer_done(self, t: asyncio.Task[None]) -> None: """Handle streamer task completion and optional cleanup.""" diff --git a/music_assistant/providers/universal_group/ugp_stream.py b/music_assistant/providers/universal_group/ugp_stream.py index 12c02bbe..8a10012f 100644 --- a/music_assistant/providers/universal_group/ugp_stream.py +++ b/music_assistant/providers/universal_group/ugp_stream.py @@ -9,6 +9,7 @@ filter_params for each client. from __future__ import annotations import asyncio +import logging from collections.abc import AsyncGenerator, Awaitable, Callable from contextlib import suppress from typing import TYPE_CHECKING @@ -16,9 +17,12 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from music_assistant_models.media_items import AudioFormat +from music_assistant.constants import MASS_LOGGER_NAME from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.helpers.util import empty_queue +LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.providers.ugp_stream") + class UGPStream: """ @@ -96,17 +100,24 @@ class UGPStream: async def _runner(self) -> None: """Run the stream for the given audio source.""" await asyncio.sleep(0.25) # small delay to allow subscribers to connect - async for chunk in get_ffmpeg_stream( - audio_input=self.audio_source, - input_format=self.input_format, - output_format=self.base_pcm_format, - # we don't allow the player to buffer too much ahead so we use readrate limiting - extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"], - ): - await asyncio.gather( - *[sub(chunk) for sub in self.subscribers], - return_exceptions=True, - ) - # empty chunk when done - await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True) - self._done.set() + try: + async for chunk in get_ffmpeg_stream( + audio_input=self.audio_source, + input_format=self.input_format, + output_format=self.base_pcm_format, + # we don't allow the player to buffer too much ahead so we use readrate limiting + extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"], + ): + await asyncio.gather( + *[sub(chunk) for sub in self.subscribers], + return_exceptions=True, + ) + except asyncio.CancelledError: + LOGGER.debug("UGP stream runner cancelled") + raise + except Exception as err: + LOGGER.error("UGP stream runner error: %s", err, exc_info=err) + finally: + # empty chunk when done + await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True) + self._done.set()