Fix: Continue playback when error ocurred writing audio cache
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 2 May 2025 15:02:35 +0000 (17:02 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 2 May 2025 15:02:35 +0000 (17:02 +0200)
music_assistant/controllers/player_queues.py
music_assistant/helpers/audio.py

index 2d5f05846307c5464b3978b47d8cba5c3c924610..0d632cd4e4966a168d639a2279595b54fc810097 100644 (file)
@@ -33,6 +33,7 @@ from music_assistant_models.enums import (
     RepeatMode,
 )
 from music_assistant_models.errors import (
+    AudioError,
     InvalidCommand,
     InvalidDataError,
     MediaNotFoundError,
@@ -842,7 +843,7 @@ class PlayerQueuesController(CoreController):
                     queue.current_index = index
                     queue.current_item = queue_item
                     break
-                except MediaNotFoundError:
+                except (MediaNotFoundError, AudioError):
                     # the requested index can not be played.
                     self.logger.warning(
                         "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
@@ -1030,7 +1031,7 @@ class PlayerQueuesController(CoreController):
                 # we're all set, this is our next item
                 next_item = queue_item
                 break
-            except MediaNotFoundError:
+            except (MediaNotFoundError, AudioError):
                 # No stream details found, skip this QueueItem
                 self.logger.warning(
                     "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
index 85620486521030e8f859b056abc2bef4e1a9c4d1..aa231bf4e37e3b6a1030fc804839c8581a984c13 100644 (file)
@@ -97,6 +97,7 @@ class StreamCache:
                     # cache file already exists from a previous session,
                     # we can simply use that, there is nothing to create
                     CACHE_FILES_IN_USE.add(self._cache_file)
+                    self._all_data_written = True
                     return
             else:
                 # create new cache file
@@ -114,6 +115,10 @@ class StreamCache:
             self._fetch_task = self.mass.create_task(self._create_cache_file())
         # wait until the first part of the file is received
         await self._first_part_received.wait()
+        if self._stream_error:
+            # an error occurred while creating the cache file
+            # remove the cache file and raise an error
+            raise AudioError(self._stream_error)
 
     def release(self) -> None:
         """Release the cache file."""
@@ -130,12 +135,14 @@ class StreamCache:
         stream the (intermediate) audio data from the cache file.
         """
         self._subscribers += 1
+        assert self._cache_file is not None  # type guard
         # mark file as in-use to prevent it being deleted
         CACHE_FILES_IN_USE.add(self._cache_file)
 
         async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
             chunksize = get_chunksize(self.streamdetails.audio_format, 1)
             wait_loops = 0
+            assert self._cache_file is not None  # type guard
             async with aiofiles.open(self._cache_file, "rb") as file:
                 while wait_loops < 2000:
                     chunk = await file.read(chunksize)
@@ -143,7 +150,7 @@ class StreamCache:
                         yield chunk
                         await asyncio.sleep(0)  # yield to eventloop
                         del chunk
-                    elif self._all_data_written.is_set():
+                    elif self._all_data_written:
                         # reached EOF
                         break
                     else:
@@ -152,13 +159,9 @@ class StreamCache:
                         # prevent an infinite loop in case of an error
                         wait_loops += 1
 
-        if await asyncio.to_thread(os.path.exists, self._cache_file):
-            if self._fetch_task is None:
-                # a complete cache file already exists on disk (from a previous run)
-                return self._cache_file
-            if self._all_data_written.is_set():
-                # cache file was created recently but ready
-                return self._cache_file
+        if self._all_data_written:
+            # cache file is ready
+            return self._cache_file
 
         # cache file does not exist at all (or is still being written)
         await self.create()
@@ -169,8 +172,8 @@ class StreamCache:
         self.logger.debug("Creating audio cache for %s", self.streamdetails.uri)
         CACHE_FILES_IN_USE.add(self._cache_file)
         self._first_part_received.clear()
-        self._all_data_written.clear()
-        extra_input_args = self.org_extra_input_args or []
+        self._all_data_written = False
+        extra_input_args = ["-y", *(self.org_extra_input_args or [])]
         if self.org_stream_type == StreamType.CUSTOM:
             audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
                 self.streamdetails,
@@ -234,22 +237,26 @@ class StreamCache:
                 raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}")
 
             # set 'all data written' event to signal that the entire file is ready
-            self._all_data_written.set()
+            self._all_data_written = True
             self.logger.debug(
                 "Writing all data for %s done in %.2fs",
                 self.streamdetails.uri,
                 time.time() - time_start,
             )
-        except Exception as err:
+        except BaseException as err:
             self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err)
             # make sure that the (corrupted/incomplete) cache file is removed
             await self._remove_cache_file()
+            # unblock the waiting tasks by setting the event
+            # this will allow the tasks to continue and handle the error
+            self._stream_error = str(err) or err.__qualname__
+            self._first_part_received.set()
         finally:
             await ffmpeg_proc.close()
 
     async def _remove_cache_file(self) -> None:
         self._first_part_received.clear()
-        self._all_data_written.clear()
+        self._all_data_written = False
         self._fetch_task = None
         await remove_file(self._cache_file)
 
@@ -262,7 +269,8 @@ class StreamCache:
         self._fetch_task: asyncio.Task | None = None
         self._subscribers: int = 0
         self._first_part_received = asyncio.Event()
-        self._all_data_written = asyncio.Event()
+        self._all_data_written: bool = False
+        self._stream_error: str | None = None
         self.org_path: str | None = streamdetails.path
         self.org_stream_type: StreamType | None = streamdetails.stream_type
         self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args