Add a bunch of extra error handling and logging for flow streams
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 22 Feb 2026 01:47:07 +0000 (02:47 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 22 Feb 2026 01:47:07 +0000 (02:47 +0100)
music_assistant/controllers/streams/streams_controller.py
music_assistant/helpers/ffmpeg.py
music_assistant/helpers/process.py
music_assistant/providers/airplay/stream_session.py
music_assistant/providers/snapcast/ma_stream.py
music_assistant/providers/universal_group/ugp_stream.py

index ecdb25cfe565cdcc1190c3eb8c3ab04c587459ee..6b439e0a7d2df17e28bece151b80f953c3233639 100644 (file)
@@ -1006,10 +1006,13 @@ class StreamsController(CoreController):
                     break
 
             if queue_track.streamdetails is None:
-                raise InvalidDataError(
-                    "No Streamdetails known for queue item %s",
+                self.logger.error(
+                    "No StreamDetails for queue item %s (%s) on queue %s - skipping track",
                     queue_track.queue_item_id,
+                    queue_track.name,
+                    queue.display_name,
                 )
+                continue
 
             self.logger.debug(
                 "Start Streaming queue track: %s (%s) for queue %s",
@@ -1086,30 +1089,45 @@ class StreamsController(CoreController):
                     fadein_part = buffer[:crossfade_buffer_size]
                     remaining_bytes = buffer[crossfade_buffer_size:]
                     # Use the mixer to handle all crossfade logic
-                    crossfade_part = await self._smart_fades_mixer.mix(
-                        fade_in_part=fadein_part,
-                        fade_out_part=last_fadeout_part,
-                        fade_in_streamdetails=queue_track.streamdetails,
-                        fade_out_streamdetails=last_streamdetails,
-                        pcm_format=pcm_format,
-                        standard_crossfade_duration=standard_crossfade_duration,
-                        mode=smart_fades_mode,
-                    )
-                    # because the crossfade exists of both the fadein and fadeout part
-                    # we need to correct the bytes_written accordingly so the duration
-                    # calculations at the end of the track are correct
-                    crossfade_part_len = len(crossfade_part)
-                    bytes_written += int(crossfade_part_len / 2)
-                    if last_play_log_entry:
-                        assert last_play_log_entry.seconds_streamed is not None
-                        last_play_log_entry.seconds_streamed += (
-                            crossfade_part_len / 2 / pcm_sample_size
+                    try:
+                        crossfade_part = await self._smart_fades_mixer.mix(
+                            fade_in_part=fadein_part,
+                            fade_out_part=last_fadeout_part,
+                            fade_in_streamdetails=queue_track.streamdetails,
+                            fade_out_streamdetails=last_streamdetails,
+                            pcm_format=pcm_format,
+                            standard_crossfade_duration=standard_crossfade_duration,
+                            mode=smart_fades_mode,
                         )
-                    # yield crossfade_part (in pcm_sample_size chunks)
-                    for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
-                        yield _chunk
-                        del _chunk
-                    del crossfade_part
+                    except Exception as mix_err:
+                        self.logger.warning(
+                            "Crossfade mixer failed for %s, falling back to simple concat: %s",
+                            queue_track.name,
+                            mix_err,
+                        )
+                        # Fallback: just output the fadeout part then the buffer
+                        for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
+                            yield _chunk
+                            bytes_written += len(_chunk)
+                            del _chunk
+                        crossfade_part = b""
+                        remaining_bytes = buffer
+                    if crossfade_part:
+                        # because the crossfade exists of both the fadein and fadeout part
+                        # we need to correct the bytes_written accordingly so the duration
+                        # calculations at the end of the track are correct
+                        crossfade_part_len = len(crossfade_part)
+                        bytes_written += int(crossfade_part_len / 2)
+                        if last_play_log_entry:
+                            assert last_play_log_entry.seconds_streamed is not None
+                            last_play_log_entry.seconds_streamed += (
+                                crossfade_part_len / 2 / pcm_sample_size
+                            )
+                        # yield crossfade_part (in pcm_sample_size chunks)
+                        for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
+                            yield _chunk
+                            del _chunk
+                        del crossfade_part
                     # also write the leftover bytes from the crossfade action
                     if remaining_bytes:
                         yield remaining_bytes
@@ -1127,6 +1145,18 @@ class StreamsController(CoreController):
                     buffer = buffer[pcm_sample_size:]
 
             #### HANDLE END OF TRACK
+            if not first_chunk_received:
+                # Track failed to stream - no chunks received at all
+                self.logger.warning(
+                    "Track %s (%s) on queue %s produced no audio data - skipping",
+                    queue_track.name,
+                    queue_track.streamdetails.uri if queue_track.streamdetails else "unknown",
+                    queue.display_name,
+                )
+                # Clean up and continue to next track
+                queue_track.streamdetails.stream_error = True
+                del buffer
+                continue
             if last_fadeout_part:
                 # edge case: we did not get enough data to make the crossfade
                 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
@@ -1459,6 +1489,21 @@ class StreamsController(CoreController):
                 streamdetails.uri,
                 err,
             )
+        except asyncio.CancelledError:
+            # Don't swallow cancellation - let it propagate
+            raise
+        except Exception as err:
+            # Catch any other unexpected exceptions to prevent them from
+            # silently killing the entire queue stream
+            streamdetails.stream_error = True
+            if raise_on_error:
+                raise
+            self.logger.exception(
+                "Unexpected error while streaming queue item %s (%s): %s",
+                queue_item.name,
+                streamdetails.uri,
+                err,
+            )
         finally:
             # determine how many seconds we've streamed
             # for pcm output we can calculate this easily
index e2897762f4911ee1947eb04aff7c177dbc79178c..f886e576b6a9c4532cef4e082021a3f50ff231dc 100644 (file)
@@ -109,10 +109,17 @@ class FFMpeg(AsyncProcess):
                 self._stdin_feeder_task.cancel()
             # Always await the task to consume any exception and prevent
             # "Task exception was never retrieved" errors.
-            # Suppress CancelledError (from cancel) and any other exception
-            # since exceptions have already been propagated through the generator chain.
-            with suppress(asyncio.CancelledError, Exception):
+            try:
                 await self._stdin_feeder_task
+            except asyncio.CancelledError:
+                pass  # Expected when we cancel the task
+            except Exception as err:
+                # Log unexpected exceptions from the stdin feeder before suppressing
+                # The audio source may have failed, and we need visibility into this
+                self.logger.warning(
+                    "FFMpeg stdin feeder task ended with error: %s",
+                    err,
+                )
         if self._stderr_reader_task:
             if not self._stderr_reader_task.done():
                 self._stderr_reader_task.cancel()
index 090df1ee611a0e560e9d2dc425128e429537d768..a1287efa42d5dd75fbb00429748d9ba881d46f36 100644 (file)
@@ -260,10 +260,16 @@ class AsyncProcess:
                 self._stdin_feeder_task.cancel()
             # Always await the task to consume any exception and prevent
             # "Task exception was never retrieved" errors.
-            # Suppress CancelledError (from cancel) and any other exception
-            # since exceptions have already been propagated through the generator chain.
-            with suppress(asyncio.CancelledError, Exception):
+            try:
                 await self._stdin_feeder_task
+            except asyncio.CancelledError:
+                pass  # Expected when we cancel the task
+            except Exception as err:
+                # Log unexpected exceptions from the stdin feeder before suppressing
+                LOGGER.warning(
+                    "Process stdin feeder task ended with error: %s",
+                    err,
+                )
 
         # close stdin to signal we're done sending data
         with suppress(TimeoutError, asyncio.CancelledError):
index 3ec7c9d1dacde705f1917ca47a2db2bbb5e50a50..edda8cba7732aeaf8f602160da74ccb7f93470f9 100644 (file)
@@ -216,6 +216,7 @@ class AirPlayStreamSession:
         """Stream audio to all players."""
         pcm_sample_size = self.pcm_format.pcm_sample_size
         watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
+        stream_error: BaseException | None = None
         try:
             async for chunk in audio_source:
                 if not self._first_chunk_received.is_set():
@@ -232,11 +233,26 @@ class AirPlayStreamSession:
                     self.prov.logger.debug("No running clients remaining, stopping audio streamer")
                     break
                 self.seconds_streamed += len(chunk) / pcm_sample_size
+        except asyncio.CancelledError:
+            self.prov.logger.debug("Audio streamer cancelled after %.1fs", self.seconds_streamed)
+            raise
+        except Exception as err:
+            stream_error = err
+            self.prov.logger.error(
+                "Audio source error after %.1fs of streaming: %s",
+                self.seconds_streamed,
+                err,
+                exc_info=err,
+            )
         finally:
             if not watchdog_task.done():
                 watchdog_task.cancel()
                 with suppress(asyncio.CancelledError):
                     await watchdog_task
+            if stream_error:
+                self.prov.logger.warning(
+                    "Stream ended prematurely due to error - notifying players"
+                )
         async with self._lock:
             await asyncio.gather(
                 *[
index 8856328b4d993559ce0460f7575927315e5d967d..15462c6f59c06e5427e01e27e196d48a09fa685d 100644 (file)
@@ -325,31 +325,39 @@ class SnapcastMAStream:
                 for t in pending:
                     t.cancel()
                 await asyncio.gather(*pending, return_exceptions=True)
-
+        except asyncio.CancelledError:
+            self._logger.debug("Snapcast stream %s cancelled", self.stream_name)
+            raise
+        except Exception as err:
+            self._logger.error("Snapcast stream %s error: %s", self.stream_name, err, exc_info=err)
+            raise
         finally:
             self._is_streaming = False
             self._logger.debug("Finished streaming to %s", stream_path)
-            # Wait a bit for snap stream to become idle
-            try:
-
-                async def wait_until_idle() -> None:
-                    while True:
-                        stream_is_idle = False
-                        with suppress(KeyError):
-                            snap_stream = self._provider._snapserver.stream(self.stream_name)
-                            stream_is_idle = snap_stream.status == "idle"
-                        if self._mass.closing or stream_is_idle:
-                            break
-                        await asyncio.sleep(0.25)
-
-                await asyncio.wait_for(wait_until_idle(), timeout=10.0)
-            except TimeoutError:
-                self._logger.warning(
-                    "Timeout waiting for stream %s to become idle",
-                    self.stream_name,
-                )
-            finally:
-                self._streaming_started_at = None
+            await self._wait_stream_idle()
+
+    async def _wait_stream_idle(self) -> None:
+        """Wait for the Snapcast stream to become idle after streaming ends."""
+        try:
+
+            async def wait_until_idle() -> None:
+                while True:
+                    stream_is_idle = False
+                    with suppress(KeyError):
+                        snap_stream = self._provider._snapserver.stream(self.stream_name)
+                        stream_is_idle = snap_stream.status == "idle"
+                    if self._mass.closing or stream_is_idle:
+                        break
+                    await asyncio.sleep(0.25)
+
+            await asyncio.wait_for(wait_until_idle(), timeout=10.0)
+        except TimeoutError:
+            self._logger.warning(
+                "Timeout waiting for stream %s to become idle",
+                self.stream_name,
+            )
+        finally:
+            self._streaming_started_at = None
 
     def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
         """Handle streamer task completion and optional cleanup."""
index 12c02bbe44ac559dad7f300fc4387cc6f36b60c1..8a10012f8fe38fd1d466273a0ee0dbe98236e945 100644 (file)
@@ -9,6 +9,7 @@ filter_params for each client.
 from __future__ import annotations
 
 import asyncio
+import logging
 from collections.abc import AsyncGenerator, Awaitable, Callable
 from contextlib import suppress
 from typing import TYPE_CHECKING
@@ -16,9 +17,12 @@ from typing import TYPE_CHECKING
 if TYPE_CHECKING:
     from music_assistant_models.media_items import AudioFormat
 
+from music_assistant.constants import MASS_LOGGER_NAME
 from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.helpers.util import empty_queue
 
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.providers.ugp_stream")
+
 
 class UGPStream:
     """
@@ -96,17 +100,24 @@ class UGPStream:
     async def _runner(self) -> None:
         """Run the stream for the given audio source."""
         await asyncio.sleep(0.25)  # small delay to allow subscribers to connect
-        async for chunk in get_ffmpeg_stream(
-            audio_input=self.audio_source,
-            input_format=self.input_format,
-            output_format=self.base_pcm_format,
-            # we don't allow the player to buffer too much ahead so we use readrate limiting
-            extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
-        ):
-            await asyncio.gather(
-                *[sub(chunk) for sub in self.subscribers],
-                return_exceptions=True,
-            )
-        # empty chunk when done
-        await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
-        self._done.set()
+        try:
+            async for chunk in get_ffmpeg_stream(
+                audio_input=self.audio_source,
+                input_format=self.input_format,
+                output_format=self.base_pcm_format,
+                # we don't allow the player to buffer too much ahead so we use readrate limiting
+                extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
+            ):
+                await asyncio.gather(
+                    *[sub(chunk) for sub in self.subscribers],
+                    return_exceptions=True,
+                )
+        except asyncio.CancelledError:
+            LOGGER.debug("UGP stream runner cancelled")
+            raise
+        except Exception as err:
+            LOGGER.error("UGP stream runner error: %s", err, exc_info=err)
+        finally:
+            # empty chunk when done
+            await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
+            self._done.set()