# cache file already exists from a previous session,
# we can simply use that, there is nothing to create
CACHE_FILES_IN_USE.add(self._cache_file)
+ self._all_data_written = True
return
else:
# create new cache file
self._fetch_task = self.mass.create_task(self._create_cache_file())
# wait until the first part of the file is received
await self._first_part_received.wait()
+ if self._stream_error:
+ # an error occurred while creating the cache file
+ # remove the cache file and raise an error
+ raise AudioError(self._stream_error)
def release(self) -> None:
"""Release the cache file."""
stream the (intermediate) audio data from the cache file.
"""
self._subscribers += 1
+ assert self._cache_file is not None # type guard
# mark file as in-use to prevent it being deleted
CACHE_FILES_IN_USE.add(self._cache_file)
async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
chunksize = get_chunksize(self.streamdetails.audio_format, 1)
wait_loops = 0
+ assert self._cache_file is not None # type guard
async with aiofiles.open(self._cache_file, "rb") as file:
while wait_loops < 2000:
chunk = await file.read(chunksize)
yield chunk
await asyncio.sleep(0) # yield to eventloop
del chunk
- elif self._all_data_written.is_set():
+ elif self._all_data_written:
# reached EOF
break
else:
# prevent an infinite loop in case of an error
wait_loops += 1
- if await asyncio.to_thread(os.path.exists, self._cache_file):
- if self._fetch_task is None:
- # a complete cache file already exists on disk (from a previous run)
- return self._cache_file
- if self._all_data_written.is_set():
- # cache file was created recently but ready
- return self._cache_file
+ if self._all_data_written:
+ # cache file is ready
+ return self._cache_file
# cache file does not exist at all (or is still being written)
await self.create()
self.logger.debug("Creating audio cache for %s", self.streamdetails.uri)
CACHE_FILES_IN_USE.add(self._cache_file)
self._first_part_received.clear()
- self._all_data_written.clear()
- extra_input_args = self.org_extra_input_args or []
+ self._all_data_written = False
+ extra_input_args = ["-y", *(self.org_extra_input_args or [])]
if self.org_stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
self.streamdetails,
raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}")
# set 'all data written' event to signal that the entire file is ready
- self._all_data_written.set()
+ self._all_data_written = True
self.logger.debug(
"Writing all data for %s done in %.2fs",
self.streamdetails.uri,
time.time() - time_start,
)
- except Exception as err:
+ except BaseException as err:
self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err)
# make sure that the (corrupted/incomplete) cache file is removed
await self._remove_cache_file()
+ # unblock the waiting tasks by setting the event
+ # this will allow the tasks to continue and handle the error
+ self._stream_error = str(err) or err.__qualname__
+ self._first_part_received.set()
finally:
await ffmpeg_proc.close()
async def _remove_cache_file(self) -> None:
self._first_part_received.clear()
- self._all_data_written.clear()
+ self._all_data_written = False
self._fetch_task = None
await remove_file(self._cache_file)
self._fetch_task: asyncio.Task | None = None
self._subscribers: int = 0
self._first_part_received = asyncio.Event()
- self._all_data_written = asyncio.Event()
+ self._all_data_written: bool = False
+ self._stream_error: str | None = None
self.org_path: str | None = streamdetails.path
self.org_stream_type: StreamType | None = streamdetails.stream_type
self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args