Better handling of errors during streaming
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 3 Nov 2025 22:35:53 +0000 (23:35 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 3 Nov 2025 22:35:53 +0000 (23:35 +0100)
Try to recover from a failed stream while streaming queue tracks

music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/audio_buffer.py
music_assistant/helpers/ffmpeg.py

index fb339d0887b33f7114549136c7100314c190a96b..9684608c0c772c2b70c2d8ac1314f29977ff4eaa 100644 (file)
@@ -28,7 +28,7 @@ from music_assistant_models.enums import (
     StreamType,
     VolumeNormalizationMode,
 )
-from music_assistant_models.errors import QueueEmpty
+from music_assistant_models.errors import AudioError, QueueEmpty
 from music_assistant_models.media_items import AudioFormat
 from music_assistant_models.player_queue import PlayLogEntry
 
@@ -62,7 +62,6 @@ from music_assistant.helpers.audio import (
     get_chunksize,
     get_media_stream,
     get_player_filter_params,
-    get_silence,
     get_stream_details,
     resample_pcm_audio,
 )
@@ -327,8 +326,7 @@ class StreamsController(CoreController):
         )
         # Start periodic garbage collection task
         # This ensures memory from audio buffers and streams is cleaned up regularly
-        # DISABLED FOR TESTING - may cause event loop blocking
-        # self.mass.call_later(900, self._periodic_garbage_collection)  # 15 minutes
+        self.mass.call_later(900, self._periodic_garbage_collection)  # 15 minutes
 
     async def close(self) -> None:
         """Cleanup on exit."""
@@ -535,18 +533,13 @@ class StreamsController(CoreController):
                 break
         if queue_item.streamdetails.stream_error:
             self.logger.error(
-                "Error streaming QueueItem %s (%s) to %s",
+                "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
                 queue_item.name,
                 queue_item.uri,
                 queue.display_name,
             )
-            # some players do not like it when we dont return anything after an error
-            # so we send some silence so they move on to the next track on their own (hopefully)
-            async for chunk in get_silence(10, output_format):
-                try:
-                    await resp.write(chunk)
-                except (BrokenPipeError, ConnectionResetError, ConnectionError):
-                    break
+            # try to skip to the next item in the queue after a short delay
+            self.mass.call_later(5, self.mass.player_queues.next(queue_id))
         return resp
 
     async def serve_queue_flow_stream(self, request: web.Request) -> web.Response:
@@ -1018,6 +1011,7 @@ class StreamsController(CoreController):
                 queue_track,
                 pcm_format=pcm_format,
                 seek_position=queue_track.streamdetails.seek_position,
+                raise_on_error=False,
             ):
                 if not first_chunk_received:
                     first_chunk_received = True
@@ -1084,7 +1078,9 @@ class StreamsController(CoreController):
             #### HANDLE END OF TRACK
             if last_fadeout_part:
                 # edge case: we did not get enough data to make the crossfade
-                yield last_fadeout_part
+                for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
+                    yield _chunk
+                    del _chunk
                 bytes_written += len(last_fadeout_part)
                 last_fadeout_part = b""
             if self._crossfade_allowed(
@@ -1102,7 +1098,9 @@ class StreamsController(CoreController):
             elif buffer:
                 # no crossfade enabled, just yield the buffer last part
                 bytes_written += len(buffer)
-                yield buffer
+                for _chunk in divide_chunks(buffer, pcm_sample_size):
+                    yield _chunk
+                    del _chunk
             # make sure the buffer gets cleaned up
             del buffer
 
@@ -1125,7 +1123,9 @@ class StreamsController(CoreController):
         #### HANDLE END OF QUEUE FLOW STREAM
         # end of queue flow: make sure we yield the last_fadeout_part
         if last_fadeout_part:
-            yield last_fadeout_part
+            for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
+                yield _chunk
+                del _chunk
             # correct seconds streamed/duration
             last_part_seconds = len(last_fadeout_part) / pcm_sample_size
             queue_track.streamdetails.seconds_streamed += last_part_seconds
@@ -1232,6 +1232,7 @@ class StreamsController(CoreController):
         queue_item: QueueItem,
         pcm_format: AudioFormat,
         seek_position: int = 0,
+        raise_on_error: bool = True,
     ) -> AsyncGenerator[bytes, None]:
         """Get the (PCM) audio stream for a single queue item."""
         # collect all arguments for ffmpeg
@@ -1338,9 +1339,20 @@ class StreamsController(CoreController):
                     yield chunk
                 # help garbage collection by explicitly deleting chunk
                 del chunk
-            if not bytes_received:
-                self.logger.error("No audio data received from source for %s", queue_item.name)
             finished = True
+        except AudioError as err:
+            queue_item.streamdetails.stream_error = True
+            queue_item.available = False
+            if raise_on_error:
+                raise
+            # yes, we swallow the error here after logging it
+            # so the outer stream can handle it gracefully
+            self.logger.error(
+                "AudioError while streaming queue item %s (%s): %s",
+                queue_item.name,
+                queue_item.streamdetails.uri,
+                err,
+            )
         finally:
             # determine how many seconds we've streamed
             # for pcm output we can calculate this easily
@@ -1376,9 +1388,12 @@ class StreamsController(CoreController):
 
         streamdetails = queue_item.streamdetails
         assert streamdetails
-        crossfade_data = self._crossfade_data.get(queue.queue_id)
+        crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
 
-        if crossfade_data and crossfade_data.queue_item_id != queue_item.queue_item_id:
+        if crossfade_data and streamdetails.seek_position > 0:
+            # don't do crossfade when seeking into track
+            crossfade_data = None
+        if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
             # edge case alert: the next item changed just while we were preloading/crossfading
             self.logger.warning(
                 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
@@ -1448,12 +1463,16 @@ class StreamsController(CoreController):
                 # send the (second half of the) crossfade data
                 if crossfade_data.pcm_format != pcm_format:
                     # edge case: pcm format mismatch, we need to resample
-                    crossfade_data.data = await resample_pcm_audio(
+                    resampled_data = await resample_pcm_audio(
                         crossfade_data.data,
                         crossfade_data.pcm_format,
                         pcm_format,
                     )
-                yield crossfade_data.data
+                    for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
+                        yield _chunk
+                else:
+                    for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
+                        yield _chunk
                 bytes_written += len(crossfade_data.data)
                 # clear vars
                 crossfade_data = None
@@ -1476,20 +1495,27 @@ class StreamsController(CoreController):
                     crossfade_data.pcm_format,
                     pcm_format,
                 )
-            yield crossfade_data.data
+            for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
+                yield _chunk
             bytes_written += len(crossfade_data.data)
             crossfade_data = None
+        next_queue_item: QueueItem | None = None
         if not self._crossfade_allowed(
             queue_item, smart_fades_mode=smart_fades_mode, flow_mode=False
         ):
             # no crossfade enabled/allowed, just yield the buffer last part
             bytes_written += len(buffer)
-            yield buffer
+            for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
+                yield _chunk
         else:
             # if crossfade is enabled, save fadeout part in buffer to pickup for next track
             fade_out_data = buffer
             buffer = b""
             # get next track for crossfade
+            self.logger.debug(
+                "Preloading NEXT track for crossfade for queue %s",
+                queue.display_name,
+            )
             try:
                 next_queue_item = await self.mass.player_queues.load_next_queue_item(
                     queue.queue_id, queue_item.queue_item_id
@@ -1539,14 +1565,13 @@ class StreamsController(CoreController):
                     mode=smart_fades_mode,
                 )
                 # send half of the crossfade_part (= approx the fadeout part)
-                crossfade_first, crossfade_second = (
-                    crossfade_bytes[: len(crossfade_bytes) // 2 + len(crossfade_bytes) % 2],
-                    crossfade_bytes[len(crossfade_bytes) // 2 + len(crossfade_bytes) % 2 :],
-                )
+                split_point = (len(crossfade_bytes) + 1) // 2
+                crossfade_first = crossfade_bytes[:split_point]
+                crossfade_second = crossfade_bytes[split_point:]
                 del crossfade_bytes
                 bytes_written += len(crossfade_first)
-                yield crossfade_first
-                del crossfade_first
+                for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
+                    yield _chunk
                 # store the other half for the next track
                 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
                     data=crossfade_second,
@@ -1554,9 +1579,10 @@ class StreamsController(CoreController):
                     pcm_format=pcm_format,
                     queue_item_id=next_queue_item.queue_item_id,
                 )
-
-            except QueueEmpty:
-                # end of queue reached or crossfade failed - no crossfade possible
+            except (QueueEmpty, AudioError):
+                # end of queue reached, next item  skipped or crossfade failed
+                # no crossfade possible, just yield the fade_out_data
+                next_queue_item = None
                 yield fade_out_data
                 bytes_written += len(fade_out_data)
                 del fade_out_data
@@ -1568,10 +1594,12 @@ class StreamsController(CoreController):
         streamdetails.seconds_streamed = seconds_streamed
         streamdetails.duration = streamdetails.seek_position + seconds_streamed
         self.logger.debug(
-            "Finished Streaming queue track: %s (%s) on queue %s",
+            "Finished Streaming queue track: %s (%s) on queue %s "
+            "- crossfade data prepared for next track: %s",
             queue_item.streamdetails.uri,
             queue_item.name,
             queue.display_name,
+            next_queue_item.name if next_queue_item else "N/A",
         )
 
     def _log_request(self, request: web.Request) -> None:
index 4015e13d28325cee7a5a3bd4d8a646514543a495..1a339d810544f05b6c0f0b8aa99ce593f3f7e30a 100644 (file)
@@ -457,6 +457,8 @@ async def get_buffered_media_stream(
             ):
                 chunk_count += 1
                 await audio_buffer.put(chunk)
+            # Only set EOF if we completed successfully
+            await audio_buffer.set_eof()
         except asyncio.CancelledError:
             status = "cancelled"
             raise
@@ -464,7 +466,6 @@ async def get_buffered_media_stream(
             status = "aborted with error"
             raise
         finally:
-            await audio_buffer.set_eof()
             LOGGER.log(
                 VERBOSE_LOG_LEVEL,
                 "fill_buffer_task: %s (%s chunks) for %s",
@@ -662,22 +663,21 @@ async def get_media_stream(
         logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
         # wait until stderr also completed reading
         await ffmpeg_proc.wait_with_timeout(5)
+        if ffmpeg_proc.returncode not in (0, None):
+            log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
+            raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
         if bytes_sent == 0:
-            # edge case: no audio data was sent
+            # edge case: no audio data was received at all
             raise AudioError("No audio was received")
-        elif ffmpeg_proc.returncode not in (0, None):
-            raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}")
         finished = True
     except (Exception, GeneratorExit, asyncio.CancelledError) as err:
         if isinstance(err, asyncio.CancelledError | GeneratorExit):
             # we were cancelled, just raise
             cancelled = True
             raise
-        logger.error("Error while streaming %s: %s", streamdetails.uri, err)
         # dump the last 10 lines of the log in case of an unclean exit
         logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
-        streamdetails.stream_error = True
-        raise
+        raise AudioError(f"Error while streaming: {err}") from err
     finally:
         # always ensure close is called which also handles all cleanup
         await ffmpeg_proc.close()
index 17f4b8740077fd03f5aad18d536ac9078961527b..836cd1f35cea4eec9efbbff131387d8eba8e2e9c 100644 (file)
@@ -183,6 +183,10 @@ class AudioBuffer:
         self._last_access_time = time.time()
 
         async with self._data_available:
+            # Check if producer had an error - raise immediately
+            if self._producer_error:
+                raise self._producer_error
+
             # Check if the chunk was already discarded
             if chunk_number < self._discarded_chunks:
                 msg = (
@@ -194,10 +198,10 @@ class AudioBuffer:
             # Wait until the requested chunk is available or EOF
             buffer_index = chunk_number - self._discarded_chunks
             while buffer_index >= len(self._chunks):
+                # Check if producer had an error - raise immediately
+                if self._producer_error:
+                    raise self._producer_error
                 if self._eof_received:
-                    # Check if producer had an error before raising EOF
-                    if self._producer_error:
-                        raise self._producer_error
                     raise AudioBufferEOF
                 await self._data_available.wait()
                 buffer_index = chunk_number - self._discarded_chunks
index 1daffa7a3c516ae0424c67491247cdded32f7642..9ea5ea0c6e371ecdd06f60307193e33d51a89037 100644 (file)
@@ -276,17 +276,17 @@ def get_ffmpeg_args(  # noqa: PLR0915
                 "1",
                 # Set the maximum delay in seconds after which to give up reconnecting.
                 "-reconnect_delay_max",
-                "30",
+                "10",
                 # If set then even streamed/non seekable streams will be reconnected on errors.
                 "-reconnect_streamed",
                 "1",
                 # Reconnect automatically in case of TCP/TLS errors during connect.
                 "-reconnect_on_network_error",
-                "1",
+                "0",
                 # A comma separated list of HTTP status codes to reconnect on.
                 # The list can include specific status codes (e.g. 503) or the strings 4xx / 5xx.
                 "-reconnect_on_http_error",
-                "5xx,4xx",
+                "5xx,429",
             ]
         if input_format.content_type.is_pcm():
             input_args += [