raise MediaNotFoundError(
f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
)
+ buffer: AudioBuffer | None = None
if queue_item.streamdetails and (
(utc() - queue_item.streamdetails.created_at).seconds < STREAMDETAILS_EXPIRATION
- or queue_item.streamdetails.buffer
+ or ((buffer := queue_item.streamdetails.buffer) and buffer.is_valid(seek_position))
):
# already got a fresh/unused (or cached) streamdetails
# we assume that the streamdetails are valid for max STREAMDETAILS_EXPIRATION seconds
"""Return number of seconds of audio currently available in the buffer."""
return len(self._chunks)
- def is_valid(self, checksum: str, seek_position: int = 0) -> bool:
+ def is_valid(self, checksum: str | None = None, seek_position: int = 0) -> bool:
"""
Validate the buffer's checksum and check if seek position is available.
if self.cancelled:
return False
- if self.checksum != checksum:
+ if checksum is not None and self.checksum != checksum:
return False
# Check if buffer is close to inactivity timeout (within 30 seconds)