From: Marcel van der Veldt Date: Fri, 2 May 2025 15:02:35 +0000 (+0200) Subject: Fix: Continue playback when error ocurred writing audio cache X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=60b951a314fa9a3e72fc69ff2783ee61b5118b3a;p=music-assistant-server.git Fix: Continue playback when error ocurred writing audio cache --- diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index 2d5f0584..0d632cd4 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -33,6 +33,7 @@ from music_assistant_models.enums import ( RepeatMode, ) from music_assistant_models.errors import ( + AudioError, InvalidCommand, InvalidDataError, MediaNotFoundError, @@ -842,7 +843,7 @@ class PlayerQueuesController(CoreController): queue.current_index = index queue.current_item = queue_item break - except MediaNotFoundError: + except (MediaNotFoundError, AudioError): # the requested index can not be played. self.logger.warning( "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri @@ -1030,7 +1031,7 @@ class PlayerQueuesController(CoreController): # we're all set, this is our next item next_item = queue_item break - except MediaNotFoundError: + except (MediaNotFoundError, AudioError): # No stream details found, skip this QueueItem self.logger.warning( "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 85620486..aa231bf4 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -97,6 +97,7 @@ class StreamCache: # cache file already exists from a previous session, # we can simply use that, there is nothing to create CACHE_FILES_IN_USE.add(self._cache_file) + self._all_data_written = True return else: # create new cache file @@ -114,6 +115,10 @@ class StreamCache: self._fetch_task = self.mass.create_task(self._create_cache_file()) # wait until the first part of the file is received await self._first_part_received.wait() + if self._stream_error: + # an error occurred while creating the cache file + # remove the cache file and raise an error + raise AudioError(self._stream_error) def release(self) -> None: """Release the cache file.""" @@ -130,12 +135,14 @@ class StreamCache: stream the (intermediate) audio data from the cache file. """ self._subscribers += 1 + assert self._cache_file is not None # type guard # mark file as in-use to prevent it being deleted CACHE_FILES_IN_USE.add(self._cache_file) async def _stream_from_cache() -> AsyncGenerator[bytes, None]: chunksize = get_chunksize(self.streamdetails.audio_format, 1) wait_loops = 0 + assert self._cache_file is not None # type guard async with aiofiles.open(self._cache_file, "rb") as file: while wait_loops < 2000: chunk = await file.read(chunksize) @@ -143,7 +150,7 @@ class StreamCache: yield chunk await asyncio.sleep(0) # yield to eventloop del chunk - elif self._all_data_written.is_set(): + elif self._all_data_written: # reached EOF break else: @@ -152,13 +159,9 @@ class StreamCache: # prevent an infinite loop in case of an error wait_loops += 1 - if await asyncio.to_thread(os.path.exists, self._cache_file): - if self._fetch_task is None: - # a complete cache file already exists on disk (from a previous run) - return self._cache_file - if self._all_data_written.is_set(): - # cache file was created recently but ready - return self._cache_file + if self._all_data_written: + # cache file is ready + return self._cache_file # cache file does not exist at all (or is still being written) await self.create() @@ -169,8 +172,8 @@ class StreamCache: self.logger.debug("Creating audio cache for %s", self.streamdetails.uri) CACHE_FILES_IN_USE.add(self._cache_file) self._first_part_received.clear() - self._all_data_written.clear() - extra_input_args = self.org_extra_input_args or [] + self._all_data_written = False + extra_input_args = ["-y", *(self.org_extra_input_args or [])] if self.org_stream_type == StreamType.CUSTOM: audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream( self.streamdetails, @@ -234,22 +237,26 @@ class StreamCache: raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}") # set 'all data written' event to signal that the entire file is ready - self._all_data_written.set() + self._all_data_written = True self.logger.debug( "Writing all data for %s done in %.2fs", self.streamdetails.uri, time.time() - time_start, ) - except Exception as err: + except BaseException as err: self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err) # make sure that the (corrupted/incomplete) cache file is removed await self._remove_cache_file() + # unblock the waiting tasks by setting the event + # this will allow the tasks to continue and handle the error + self._stream_error = str(err) or err.__qualname__ + self._first_part_received.set() finally: await ffmpeg_proc.close() async def _remove_cache_file(self) -> None: self._first_part_received.clear() - self._all_data_written.clear() + self._all_data_written = False self._fetch_task = None await remove_file(self._cache_file) @@ -262,7 +269,8 @@ class StreamCache: self._fetch_task: asyncio.Task | None = None self._subscribers: int = 0 self._first_part_received = asyncio.Event() - self._all_data_written = asyncio.Event() + self._all_data_written: bool = False + self._stream_error: str | None = None self.org_path: str | None = streamdetails.path self.org_stream_type: StreamType | None = streamdetails.stream_type self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args