From d2f69eeadaebd354bc6dd7cafbf7ba2d23c3c4cb Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 20 Sep 2024 16:27:18 +0200 Subject: [PATCH] Better handling of Spotify stream error (due to token expiration) (#1676) --- music_assistant/server/helpers/audio.py | 37 +++++------ music_assistant/server/helpers/ffmpeg.py | 4 ++ .../server/providers/spotify/__init__.py | 64 +++++++++---------- 3 files changed, 53 insertions(+), 52 deletions(-) diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 9bfe1623..559b13fd 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -349,31 +349,26 @@ async def get_media_stream( bytes_sent += len(buffer) yield buffer del buffer + finished = True + + finally: + await ffmpeg_proc.close() if bytes_sent == 0: # edge case: no audio data was sent streamdetails.stream_error = True - finished = False + seconds_streamed = 0 logger.warning("Stream error on %s", streamdetails.uri) - # we send a bit of silence so players get at least some data - # without it, some players refuse to skip to the next track - async for chunk in get_silence(6, pcm_format): - yield chunk - bytes_sent += len(chunk) else: - finished = True - finally: - await ffmpeg_proc.close() - - # try to determine how many seconds we've streamed - seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 - logger.debug( - "stream %s (with code %s) for %s - seconds streamed: %s", - "finished" if finished else "aborted", - ffmpeg_proc.returncode, - streamdetails.uri, - seconds_streamed, - ) + # try to determine how many seconds we've streamed + seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 + logger.debug( + "stream %s (with code %s) for %s - seconds streamed: %s", + "finished" if finished else "aborted", + ffmpeg_proc.returncode, + streamdetails.uri, + seconds_streamed, + ) streamdetails.seconds_streamed = seconds_streamed # store accurate duration @@ -384,7 +379,7 @@ async def get_media_stream( if ( streamdetails.loudness is None and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED - and (finished or (seconds_streamed >= 60)) + and (finished or (seconds_streamed >= 300)) ): # if dynamic volume normalization is enabled and the entire track is streamed # the loudnorm filter will output the measuremeet in the log, @@ -419,6 +414,8 @@ async def get_media_stream( streamdetails.provider, ) ) + # TODO: move this to the queue controller so we report + # actual playback time instead of buffered seconds if music_prov := mass.get_provider(streamdetails.provider): mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) diff --git a/music_assistant/server/helpers/ffmpeg.py b/music_assistant/server/helpers/ffmpeg.py index 490beafe..0aaa9dcf 100644 --- a/music_assistant/server/helpers/ffmpeg.py +++ b/music_assistant/server/helpers/ffmpeg.py @@ -120,10 +120,14 @@ class FFMpeg(AsyncProcess): if TYPE_CHECKING: self.audio_input: AsyncGenerator[bytes, None] generator_exhausted = False + audio_received = False try: async for chunk in self.audio_input: + audio_received = True await self.write(chunk) generator_exhausted = True + if not audio_received: + raise AudioError("No audio data received from source") except Exception as err: if isinstance(err, asyncio.CancelledError): return diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 4f3b3326..7bd49696 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -20,6 +20,7 @@ from music_assistant.common.models.enums import ( StreamType, ) from music_assistant.common.models.errors import ( + AudioError, LoginFailed, MediaNotFoundError, ResourceTemporarilyUnavailable, @@ -539,7 +540,6 @@ class SpotifyProvider(MusicProvider): async def get_stream_details(self, item_id: str) -> StreamDetails: """Return the content details for the given track when it will be streamed.""" - await self.login(force_fresh=True) # fetch full track details # this will also check if the track is available for streaming # and use spotify's track linking feature to serve a substitute track @@ -558,47 +558,45 @@ class SpotifyProvider(MusicProvider): self, streamdetails: StreamDetails, seek_position: int = 0 ) -> AsyncGenerator[bytes, None]: """Return the audio stream for the provider item.""" - auth_info = await self.login(force_fresh=True) librespot = await self.get_librespot_binary() spotify_uri = f"spotify://track:{streamdetails.item_id}" - args = [ - librespot, - "-c", - CACHE_DIR, - "-M", - "256M", - "--passthrough", - "-b", - "320", - "--backend", - "pipe", - "--single-track", - spotify_uri, - "--token", - auth_info["access_token"], - ] - if seek_position: - args += ["--start-position", str(int(seek_position))] - chunk_size = get_chunksize(streamdetails.audio_format) - stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot") - for attempt in range(1, 3): + for attempt in range(1, 4): + auth_info = await self.login(force_refresh=attempt == 2) + args = [ + librespot, + "-c", + CACHE_DIR, + "-M", + "256M", + "--passthrough", + "-b", + "320", + "--backend", + "pipe", + "--single-track", + spotify_uri, + "--token", + auth_info["access_token"], + ] + if seek_position: + args += ["--start-position", str(int(seek_position))] + chunk_size = get_chunksize(streamdetails.audio_format) + stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False async with AsyncProcess( args, stdout=True, stderr=stderr, name="librespot", ) as librespot_proc: - chunks_received = 0 async for chunk in librespot_proc.iter_any(chunk_size): yield chunk - chunks_received += 1 - if chunks_received: - break - self.logger.warning( + if librespot_proc.returncode == 0: + return + self.logger.debug( "librespot failed to stream track, retrying... (attempt %s/3)", attempt ) - await asyncio.sleep(0.1) + raise AudioError(f"Failed to stream track {spotify_uri} after 3 attempts") def _parse_artist(self, artist_obj): """Parse spotify artist object to generic layout.""" @@ -781,11 +779,13 @@ class SpotifyProvider(MusicProvider): return playlist @lock - async def login(self, force_fresh: bool = False) -> dict: + async def login(self, force_refresh: bool = False) -> dict: """Log-in Spotify and return Auth/token info.""" # return existing token if we have one in memory - if self._auth_info and ( - self._auth_info["expires_at"] > (time.time() - 600 if force_fresh else 300) + if ( + not force_refresh + and self._auth_info + and (self._auth_info["expires_at"] > (time.time() - 600)) ): return self._auth_info # request new access token using the refresh token -- 2.34.1