bytes_sent += len(buffer)
yield buffer
del buffer
+ finished = True
+
+ finally:
+ await ffmpeg_proc.close()
if bytes_sent == 0:
# edge case: no audio data was sent
streamdetails.stream_error = True
- finished = False
+ seconds_streamed = 0
logger.warning("Stream error on %s", streamdetails.uri)
- # we send a bit of silence so players get at least some data
- # without it, some players refuse to skip to the next track
- async for chunk in get_silence(6, pcm_format):
- yield chunk
- bytes_sent += len(chunk)
else:
- finished = True
- finally:
- await ffmpeg_proc.close()
-
- # try to determine how many seconds we've streamed
- seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
- logger.debug(
- "stream %s (with code %s) for %s - seconds streamed: %s",
- "finished" if finished else "aborted",
- ffmpeg_proc.returncode,
- streamdetails.uri,
- seconds_streamed,
- )
+ # try to determine how many seconds we've streamed
+ seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+ logger.debug(
+ "stream %s (with code %s) for %s - seconds streamed: %s",
+ "finished" if finished else "aborted",
+ ffmpeg_proc.returncode,
+ streamdetails.uri,
+ seconds_streamed,
+ )
streamdetails.seconds_streamed = seconds_streamed
# store accurate duration
if (
streamdetails.loudness is None
and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
- and (finished or (seconds_streamed >= 60))
+ and (finished or (seconds_streamed >= 300))
):
# if dynamic volume normalization is enabled and the entire track is streamed
# the loudnorm filter will output the measuremeet in the log,
streamdetails.provider,
)
)
+ # TODO: move this to the queue controller so we report
+ # actual playback time instead of buffered seconds
if music_prov := mass.get_provider(streamdetails.provider):
mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
StreamType,
)
from music_assistant.common.models.errors import (
+ AudioError,
LoginFailed,
MediaNotFoundError,
ResourceTemporarilyUnavailable,
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- await self.login(force_fresh=True)
# fetch full track details
# this will also check if the track is available for streaming
# and use spotify's track linking feature to serve a substitute track
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
- auth_info = await self.login(force_fresh=True)
librespot = await self.get_librespot_binary()
spotify_uri = f"spotify://track:{streamdetails.item_id}"
- args = [
- librespot,
- "-c",
- CACHE_DIR,
- "-M",
- "256M",
- "--passthrough",
- "-b",
- "320",
- "--backend",
- "pipe",
- "--single-track",
- spotify_uri,
- "--token",
- auth_info["access_token"],
- ]
- if seek_position:
- args += ["--start-position", str(int(seek_position))]
- chunk_size = get_chunksize(streamdetails.audio_format)
- stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
- for attempt in range(1, 3):
+ for attempt in range(1, 4):
+ auth_info = await self.login(force_refresh=attempt == 2)
+ args = [
+ librespot,
+ "-c",
+ CACHE_DIR,
+ "-M",
+ "256M",
+ "--passthrough",
+ "-b",
+ "320",
+ "--backend",
+ "pipe",
+ "--single-track",
+ spotify_uri,
+ "--token",
+ auth_info["access_token"],
+ ]
+ if seek_position:
+ args += ["--start-position", str(int(seek_position))]
+ chunk_size = get_chunksize(streamdetails.audio_format)
+ stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
async with AsyncProcess(
args,
stdout=True,
stderr=stderr,
name="librespot",
) as librespot_proc:
- chunks_received = 0
async for chunk in librespot_proc.iter_any(chunk_size):
yield chunk
- chunks_received += 1
- if chunks_received:
- break
- self.logger.warning(
+ if librespot_proc.returncode == 0:
+ return
+ self.logger.debug(
"librespot failed to stream track, retrying... (attempt %s/3)", attempt
)
- await asyncio.sleep(0.1)
+ raise AudioError(f"Failed to stream track {spotify_uri} after 3 attempts")
def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""
return playlist
@lock
- async def login(self, force_fresh: bool = False) -> dict:
+ async def login(self, force_refresh: bool = False) -> dict:
"""Log-in Spotify and return Auth/token info."""
# return existing token if we have one in memory
- if self._auth_info and (
- self._auth_info["expires_at"] > (time.time() - 600 if force_fresh else 300)
+ if (
+ not force_refresh
+ and self._auth_info
+ and (self._auth_info["expires_at"] > (time.time() - 600))
):
return self._auth_info
# request new access token using the refresh token