From: Marcel van der Veldt Date: Mon, 3 Nov 2025 22:35:53 +0000 (+0100) Subject: Better handling of errors during streaming X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=8b900d7eaf3d65e1b55d182baeb45e558bbf4d83;p=music-assistant-server.git Better handling of errors during streaming Try to recover from a failed stream while streaming queue tracks --- diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index fb339d08..9684608c 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -28,7 +28,7 @@ from music_assistant_models.enums import ( StreamType, VolumeNormalizationMode, ) -from music_assistant_models.errors import QueueEmpty +from music_assistant_models.errors import AudioError, QueueEmpty from music_assistant_models.media_items import AudioFormat from music_assistant_models.player_queue import PlayLogEntry @@ -62,7 +62,6 @@ from music_assistant.helpers.audio import ( get_chunksize, get_media_stream, get_player_filter_params, - get_silence, get_stream_details, resample_pcm_audio, ) @@ -327,8 +326,7 @@ class StreamsController(CoreController): ) # Start periodic garbage collection task # This ensures memory from audio buffers and streams is cleaned up regularly - # DISABLED FOR TESTING - may cause event loop blocking - # self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes + self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes async def close(self) -> None: """Cleanup on exit.""" @@ -535,18 +533,13 @@ class StreamsController(CoreController): break if queue_item.streamdetails.stream_error: self.logger.error( - "Error streaming QueueItem %s (%s) to %s", + "Error streaming QueueItem %s (%s) to %s - will try to skip to next item", queue_item.name, queue_item.uri, queue.display_name, ) - # some players do not like it when we dont return anything after an error - # so we send some silence so they move on to the next track on their own (hopefully) - async for chunk in get_silence(10, output_format): - try: - await resp.write(chunk) - except (BrokenPipeError, ConnectionResetError, ConnectionError): - break + # try to skip to the next item in the queue after a short delay + self.mass.call_later(5, self.mass.player_queues.next(queue_id)) return resp async def serve_queue_flow_stream(self, request: web.Request) -> web.Response: @@ -1018,6 +1011,7 @@ class StreamsController(CoreController): queue_track, pcm_format=pcm_format, seek_position=queue_track.streamdetails.seek_position, + raise_on_error=False, ): if not first_chunk_received: first_chunk_received = True @@ -1084,7 +1078,9 @@ class StreamsController(CoreController): #### HANDLE END OF TRACK if last_fadeout_part: # edge case: we did not get enough data to make the crossfade - yield last_fadeout_part + for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size): + yield _chunk + del _chunk bytes_written += len(last_fadeout_part) last_fadeout_part = b"" if self._crossfade_allowed( @@ -1102,7 +1098,9 @@ class StreamsController(CoreController): elif buffer: # no crossfade enabled, just yield the buffer last part bytes_written += len(buffer) - yield buffer + for _chunk in divide_chunks(buffer, pcm_sample_size): + yield _chunk + del _chunk # make sure the buffer gets cleaned up del buffer @@ -1125,7 +1123,9 @@ class StreamsController(CoreController): #### HANDLE END OF QUEUE FLOW STREAM # end of queue flow: make sure we yield the last_fadeout_part if last_fadeout_part: - yield last_fadeout_part + for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size): + yield _chunk + del _chunk # correct seconds streamed/duration last_part_seconds = len(last_fadeout_part) / pcm_sample_size queue_track.streamdetails.seconds_streamed += last_part_seconds @@ -1232,6 +1232,7 @@ class StreamsController(CoreController): queue_item: QueueItem, pcm_format: AudioFormat, seek_position: int = 0, + raise_on_error: bool = True, ) -> AsyncGenerator[bytes, None]: """Get the (PCM) audio stream for a single queue item.""" # collect all arguments for ffmpeg @@ -1338,9 +1339,20 @@ class StreamsController(CoreController): yield chunk # help garbage collection by explicitly deleting chunk del chunk - if not bytes_received: - self.logger.error("No audio data received from source for %s", queue_item.name) finished = True + except AudioError as err: + queue_item.streamdetails.stream_error = True + queue_item.available = False + if raise_on_error: + raise + # yes, we swallow the error here after logging it + # so the outer stream can handle it gracefully + self.logger.error( + "AudioError while streaming queue item %s (%s): %s", + queue_item.name, + queue_item.streamdetails.uri, + err, + ) finally: # determine how many seconds we've streamed # for pcm output we can calculate this easily @@ -1376,9 +1388,12 @@ class StreamsController(CoreController): streamdetails = queue_item.streamdetails assert streamdetails - crossfade_data = self._crossfade_data.get(queue.queue_id) + crossfade_data = self._crossfade_data.pop(queue.queue_id, None) - if crossfade_data and crossfade_data.queue_item_id != queue_item.queue_item_id: + if crossfade_data and streamdetails.seek_position > 0: + # don't do crossfade when seeking into track + crossfade_data = None + if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id): # edge case alert: the next item changed just while we were preloading/crossfading self.logger.warning( "Skipping crossfade data for queue %s - next item changed!", queue.display_name @@ -1448,12 +1463,16 @@ class StreamsController(CoreController): # send the (second half of the) crossfade data if crossfade_data.pcm_format != pcm_format: # edge case: pcm format mismatch, we need to resample - crossfade_data.data = await resample_pcm_audio( + resampled_data = await resample_pcm_audio( crossfade_data.data, crossfade_data.pcm_format, pcm_format, ) - yield crossfade_data.data + for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size): + yield _chunk + else: + for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size): + yield _chunk bytes_written += len(crossfade_data.data) # clear vars crossfade_data = None @@ -1476,20 +1495,27 @@ class StreamsController(CoreController): crossfade_data.pcm_format, pcm_format, ) - yield crossfade_data.data + for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size): + yield _chunk bytes_written += len(crossfade_data.data) crossfade_data = None + next_queue_item: QueueItem | None = None if not self._crossfade_allowed( queue_item, smart_fades_mode=smart_fades_mode, flow_mode=False ): # no crossfade enabled/allowed, just yield the buffer last part bytes_written += len(buffer) - yield buffer + for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size): + yield _chunk else: # if crossfade is enabled, save fadeout part in buffer to pickup for next track fade_out_data = buffer buffer = b"" # get next track for crossfade + self.logger.debug( + "Preloading NEXT track for crossfade for queue %s", + queue.display_name, + ) try: next_queue_item = await self.mass.player_queues.load_next_queue_item( queue.queue_id, queue_item.queue_item_id @@ -1539,14 +1565,13 @@ class StreamsController(CoreController): mode=smart_fades_mode, ) # send half of the crossfade_part (= approx the fadeout part) - crossfade_first, crossfade_second = ( - crossfade_bytes[: len(crossfade_bytes) // 2 + len(crossfade_bytes) % 2], - crossfade_bytes[len(crossfade_bytes) // 2 + len(crossfade_bytes) % 2 :], - ) + split_point = (len(crossfade_bytes) + 1) // 2 + crossfade_first = crossfade_bytes[:split_point] + crossfade_second = crossfade_bytes[split_point:] del crossfade_bytes bytes_written += len(crossfade_first) - yield crossfade_first - del crossfade_first + for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size): + yield _chunk # store the other half for the next track self._crossfade_data[queue_item.queue_id] = CrossfadeData( data=crossfade_second, @@ -1554,9 +1579,10 @@ class StreamsController(CoreController): pcm_format=pcm_format, queue_item_id=next_queue_item.queue_item_id, ) - - except QueueEmpty: - # end of queue reached or crossfade failed - no crossfade possible + except (QueueEmpty, AudioError): + # end of queue reached, next item skipped or crossfade failed + # no crossfade possible, just yield the fade_out_data + next_queue_item = None yield fade_out_data bytes_written += len(fade_out_data) del fade_out_data @@ -1568,10 +1594,12 @@ class StreamsController(CoreController): streamdetails.seconds_streamed = seconds_streamed streamdetails.duration = streamdetails.seek_position + seconds_streamed self.logger.debug( - "Finished Streaming queue track: %s (%s) on queue %s", + "Finished Streaming queue track: %s (%s) on queue %s " + "- crossfade data prepared for next track: %s", queue_item.streamdetails.uri, queue_item.name, queue.display_name, + next_queue_item.name if next_queue_item else "N/A", ) def _log_request(self, request: web.Request) -> None: diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 4015e13d..1a339d81 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -457,6 +457,8 @@ async def get_buffered_media_stream( ): chunk_count += 1 await audio_buffer.put(chunk) + # Only set EOF if we completed successfully + await audio_buffer.set_eof() except asyncio.CancelledError: status = "cancelled" raise @@ -464,7 +466,6 @@ async def get_buffered_media_stream( status = "aborted with error" raise finally: - await audio_buffer.set_eof() LOGGER.log( VERBOSE_LOG_LEVEL, "fill_buffer_task: %s (%s chunks) for %s", @@ -662,22 +663,21 @@ async def get_media_stream( logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.") # wait until stderr also completed reading await ffmpeg_proc.wait_with_timeout(5) + if ffmpeg_proc.returncode not in (0, None): + log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:]) + raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}") if bytes_sent == 0: - # edge case: no audio data was sent + # edge case: no audio data was received at all raise AudioError("No audio was received") - elif ffmpeg_proc.returncode not in (0, None): - raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}") finished = True except (Exception, GeneratorExit, asyncio.CancelledError) as err: if isinstance(err, asyncio.CancelledError | GeneratorExit): # we were cancelled, just raise cancelled = True raise - logger.error("Error while streaming %s: %s", streamdetails.uri, err) # dump the last 10 lines of the log in case of an unclean exit logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:])) - streamdetails.stream_error = True - raise + raise AudioError(f"Error while streaming: {err}") from err finally: # always ensure close is called which also handles all cleanup await ffmpeg_proc.close() diff --git a/music_assistant/helpers/audio_buffer.py b/music_assistant/helpers/audio_buffer.py index 17f4b874..836cd1f3 100644 --- a/music_assistant/helpers/audio_buffer.py +++ b/music_assistant/helpers/audio_buffer.py @@ -183,6 +183,10 @@ class AudioBuffer: self._last_access_time = time.time() async with self._data_available: + # Check if producer had an error - raise immediately + if self._producer_error: + raise self._producer_error + # Check if the chunk was already discarded if chunk_number < self._discarded_chunks: msg = ( @@ -194,10 +198,10 @@ class AudioBuffer: # Wait until the requested chunk is available or EOF buffer_index = chunk_number - self._discarded_chunks while buffer_index >= len(self._chunks): + # Check if producer had an error - raise immediately + if self._producer_error: + raise self._producer_error if self._eof_received: - # Check if producer had an error before raising EOF - if self._producer_error: - raise self._producer_error raise AudioBufferEOF await self._data_available.wait() buffer_index = chunk_number - self._discarded_chunks diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 1daffa7a..9ea5ea0c 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -276,17 +276,17 @@ def get_ffmpeg_args( # noqa: PLR0915 "1", # Set the maximum delay in seconds after which to give up reconnecting. "-reconnect_delay_max", - "30", + "10", # If set then even streamed/non seekable streams will be reconnected on errors. "-reconnect_streamed", "1", # Reconnect automatically in case of TCP/TLS errors during connect. "-reconnect_on_network_error", - "1", + "0", # A comma separated list of HTTP status codes to reconnect on. # The list can include specific status codes (e.g. 503) or the strings 4xx / 5xx. "-reconnect_on_http_error", - "5xx,4xx", + "5xx,429", ] if input_format.content_type.is_pcm(): input_args += [