Better handling of Spotify stream error (due to token expiration) (#1676)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 20 Sep 2024 14:27:18 +0000 (16:27 +0200)
committerGitHub <noreply@github.com>
Fri, 20 Sep 2024 14:27:18 +0000 (16:27 +0200)
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/ffmpeg.py
music_assistant/server/providers/spotify/__init__.py

index 9bfe1623a4427c51f1201ffbf37cbd75ae0b4b58..559b13fd73a55e7ad118905ed834f3c6b3d47b10 100644 (file)
@@ -349,31 +349,26 @@ async def get_media_stream(
         bytes_sent += len(buffer)
         yield buffer
         del buffer
+        finished = True
+
+    finally:
+        await ffmpeg_proc.close()
 
         if bytes_sent == 0:
             # edge case: no audio data was sent
             streamdetails.stream_error = True
-            finished = False
+            seconds_streamed = 0
             logger.warning("Stream error on %s", streamdetails.uri)
-            # we send a bit of silence so players get at least some data
-            # without it, some players refuse to skip to the next track
-            async for chunk in get_silence(6, pcm_format):
-                yield chunk
-                bytes_sent += len(chunk)
         else:
-            finished = True
-    finally:
-        await ffmpeg_proc.close()
-
-        # try to determine how many seconds we've streamed
-        seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
-        logger.debug(
-            "stream %s (with code %s) for %s - seconds streamed: %s",
-            "finished" if finished else "aborted",
-            ffmpeg_proc.returncode,
-            streamdetails.uri,
-            seconds_streamed,
-        )
+            # try to determine how many seconds we've streamed
+            seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+            logger.debug(
+                "stream %s (with code %s) for %s - seconds streamed: %s",
+                "finished" if finished else "aborted",
+                ffmpeg_proc.returncode,
+                streamdetails.uri,
+                seconds_streamed,
+            )
 
         streamdetails.seconds_streamed = seconds_streamed
         # store accurate duration
@@ -384,7 +379,7 @@ async def get_media_stream(
         if (
             streamdetails.loudness is None
             and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
-            and (finished or (seconds_streamed >= 60))
+            and (finished or (seconds_streamed >= 300))
         ):
             # if dynamic volume normalization is enabled and the entire track is streamed
             # the loudnorm filter will output the measuremeet in the log,
@@ -419,6 +414,8 @@ async def get_media_stream(
                     streamdetails.provider,
                 )
             )
+            # TODO: move this to the queue controller so we report
+            # actual playback time instead of buffered seconds
             if music_prov := mass.get_provider(streamdetails.provider):
                 mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
 
index 490beafe5cb8891db95dd7ee89f25620e44179a1..0aaa9dcf18f1e6a07a2e5ae9dd2fe89579a59bde 100644 (file)
@@ -120,10 +120,14 @@ class FFMpeg(AsyncProcess):
         if TYPE_CHECKING:
             self.audio_input: AsyncGenerator[bytes, None]
         generator_exhausted = False
+        audio_received = False
         try:
             async for chunk in self.audio_input:
+                audio_received = True
                 await self.write(chunk)
             generator_exhausted = True
+            if not audio_received:
+                raise AudioError("No audio data received from source")
         except Exception as err:
             if isinstance(err, asyncio.CancelledError):
                 return
index 4f3b3326ab642354089e1ef3f75c73b0420130bc..7bd49696ea99e20c343e7af32be4f764cd5aa4b9 100644 (file)
@@ -20,6 +20,7 @@ from music_assistant.common.models.enums import (
     StreamType,
 )
 from music_assistant.common.models.errors import (
+    AudioError,
     LoginFailed,
     MediaNotFoundError,
     ResourceTemporarilyUnavailable,
@@ -539,7 +540,6 @@ class SpotifyProvider(MusicProvider):
 
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
-        await self.login(force_fresh=True)
         # fetch full track details
         # this will also check if the track is available for streaming
         # and use spotify's track linking feature to serve a substitute track
@@ -558,47 +558,45 @@ class SpotifyProvider(MusicProvider):
         self, streamdetails: StreamDetails, seek_position: int = 0
     ) -> AsyncGenerator[bytes, None]:
         """Return the audio stream for the provider item."""
-        auth_info = await self.login(force_fresh=True)
         librespot = await self.get_librespot_binary()
         spotify_uri = f"spotify://track:{streamdetails.item_id}"
-        args = [
-            librespot,
-            "-c",
-            CACHE_DIR,
-            "-M",
-            "256M",
-            "--passthrough",
-            "-b",
-            "320",
-            "--backend",
-            "pipe",
-            "--single-track",
-            spotify_uri,
-            "--token",
-            auth_info["access_token"],
-        ]
-        if seek_position:
-            args += ["--start-position", str(int(seek_position))]
-        chunk_size = get_chunksize(streamdetails.audio_format)
-        stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
         self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
-        for attempt in range(1, 3):
+        for attempt in range(1, 4):
+            auth_info = await self.login(force_refresh=attempt == 2)
+            args = [
+                librespot,
+                "-c",
+                CACHE_DIR,
+                "-M",
+                "256M",
+                "--passthrough",
+                "-b",
+                "320",
+                "--backend",
+                "pipe",
+                "--single-track",
+                spotify_uri,
+                "--token",
+                auth_info["access_token"],
+            ]
+            if seek_position:
+                args += ["--start-position", str(int(seek_position))]
+            chunk_size = get_chunksize(streamdetails.audio_format)
+            stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
             async with AsyncProcess(
                 args,
                 stdout=True,
                 stderr=stderr,
                 name="librespot",
             ) as librespot_proc:
-                chunks_received = 0
                 async for chunk in librespot_proc.iter_any(chunk_size):
                     yield chunk
-                    chunks_received += 1
-            if chunks_received:
-                break
-            self.logger.warning(
+            if librespot_proc.returncode == 0:
+                return
+            self.logger.debug(
                 "librespot failed to stream track, retrying... (attempt %s/3)", attempt
             )
-            await asyncio.sleep(0.1)
+        raise AudioError(f"Failed to stream track {spotify_uri} after 3 attempts")
 
     def _parse_artist(self, artist_obj):
         """Parse spotify artist object to generic layout."""
@@ -781,11 +779,13 @@ class SpotifyProvider(MusicProvider):
         return playlist
 
     @lock
-    async def login(self, force_fresh: bool = False) -> dict:
+    async def login(self, force_refresh: bool = False) -> dict:
         """Log-in Spotify and return Auth/token info."""
         # return existing token if we have one in memory
-        if self._auth_info and (
-            self._auth_info["expires_at"] > (time.time() - 600 if force_fresh else 300)
+        if (
+            not force_refresh
+            and self._auth_info
+            and (self._auth_info["expires_at"] > (time.time() - 600))
         ):
             return self._auth_info
         # request new access token using the refresh token