From fbd99096e6562e63cad0c723cd3769a0de5808bd Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sun, 8 Sep 2024 11:48:10 +0200 Subject: [PATCH] Fix playback get stuck when there is a stream error on a single track (#1651) --- .../common/models/streamdetails.py | 3 + .../server/controllers/player_queues.py | 16 +- music_assistant/server/controllers/streams.py | 132 ++++----------- music_assistant/server/helpers/audio.py | 154 +++++++++++++++++- music_assistant/server/helpers/util.py | 26 ++- .../server/providers/qobuz/__init__.py | 2 + .../server/providers/spotify/__init__.py | 135 ++++++++------- 7 files changed, 291 insertions(+), 177 deletions(-) diff --git a/music_assistant/common/models/streamdetails.py b/music_assistant/common/models/streamdetails.py index f8caebb4..c80a1f67 100644 --- a/music_assistant/common/models/streamdetails.py +++ b/music_assistant/common/models/streamdetails.py @@ -60,6 +60,9 @@ class StreamDetails(DataClassDictMixin): seconds_streamed: float | None = None target_loudness: float | None = None bypass_loudness_normalization: bool = False + strip_silence_begin: bool = False + strip_silence_end: bool = False + stream_error: bool | None = None def __str__(self) -> str: """Return pretty printable string of object.""" diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 0fe92f16..9fdc0407 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -35,7 +35,12 @@ from music_assistant.common.models.player import PlayerMedia from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.common.models.queue_item import QueueItem from music_assistant.common.models.streamdetails import StreamDetails -from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, MASS_LOGO_ONLINE +from music_assistant.constants import ( + CONF_CROSSFADE, + CONF_FLOW_MODE, + FALLBACK_DURATION, + MASS_LOGO_ONLINE, +) from music_assistant.server.helpers.api import api_command from music_assistant.server.helpers.audio import get_stream_details from music_assistant.server.models.core_controller import CoreController @@ -767,6 +772,10 @@ class PlayerQueuesController(CoreController): queue_item.streamdetails = await get_stream_details( self.mass, queue_item, seek_position=seek_position, fade_in=fade_in ) + # allow stripping silence from the end of the track if crossfade is enabled + # this will allow for smoother crossfades + if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE): + queue_item.streamdetails.strip_silence_end = True # send play_media request to player # NOTE that we debounce this a bit to account for someone hitting the next button # like a madman. This will prevent the player from being overloaded with requests. @@ -1029,6 +1038,11 @@ class PlayerQueuesController(CoreController): # maximum quality of thumbs if queue_item.media_item: queue_item.media_item = await self.mass.music.get_item_by_uri(queue_item.uri) + # allow stripping silence from the begin/end of the track if crossfade is enabled + # this will allow for (much) smoother crossfades + if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE): + queue_item.streamdetails.strip_silence_end = True + queue_item.streamdetails.strip_silence_begin = True # we're all set, this is our next item next_item = queue_item break diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 39d51b3e..c505ee14 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -8,7 +8,6 @@ the upnp callbacks and json rpc api for slimproto clients. from __future__ import annotations -import asyncio import os import time import urllib.parse @@ -47,7 +46,6 @@ from music_assistant.constants import ( ) from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( - FFMpeg, check_audio_support, crossfade_pcm_parts, get_chunksize, @@ -55,11 +53,10 @@ from music_assistant.server.helpers.audio import ( get_hls_radio_stream, get_hls_substream, get_icy_radio_stream, + get_media_stream, get_player_filter_params, get_silence, get_stream_details, - parse_loudnorm, - strip_silence, ) from music_assistant.server.helpers.util import get_ips from music_assistant.server.helpers.webserver import Webserver @@ -336,7 +333,8 @@ class StreamsController(CoreController): # all checks passed, start streaming! self.logger.debug( - "Start serving audio stream for QueueItem %s to %s", + "Start serving audio stream for QueueItem %s (%s) to %s", + queue_item.name, queue_item.uri, queue.display_name, ) @@ -347,6 +345,7 @@ class StreamsController(CoreController): bit_depth=queue_item.streamdetails.audio_format.bit_depth, channels=2, ) + chunk_num = 0 async for chunk in get_ffmpeg_stream( audio_input=self.get_media_stream( streamdetails=queue_item.streamdetails, @@ -358,8 +357,16 @@ class StreamsController(CoreController): ): try: await resp.write(chunk) + chunk_num += 1 except (BrokenPipeError, ConnectionResetError, ConnectionError): break + if queue_item.streamdetails.stream_error: + self.logger.error( + "Error streaming QueueItem %s (%s) to %s", + queue_item.name, + queue_item.uri, + queue.display_name, + ) if queue.stream_finished is not None: queue.stream_finished = True return resp @@ -586,7 +593,6 @@ class StreamsController(CoreController): use_crossfade, ) total_bytes_sent = 0 - started = time.time() while True: # get (next) queue item to stream @@ -629,14 +635,7 @@ class StreamsController(CoreController): pcm_format=pcm_format, ): # buffer size needs to be big enough to include the crossfade part - # allow it to be a bit smaller when playback just starts - if not use_crossfade: - req_buffer_size = pcm_sample_size * 2 - elif (time.time() - started) > 120: - # additional 5 seconds to strip silence from last part - req_buffer_size = crossfade_size + pcm_sample_size * 5 - else: - req_buffer_size = crossfade_size + req_buffer_size = pcm_sample_size * 2 if not use_crossfade else crossfade_size # ALWAYS APPEND CHUNK TO BUFFER buffer += chunk @@ -647,14 +646,6 @@ class StreamsController(CoreController): #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK if last_fadeout_part: - # strip silence from last part - buffer = await strip_silence( - self.mass, - buffer, - sample_rate=pcm_format.sample_rate, - bit_depth=pcm_format.bit_depth, - reverse=False, - ) # perform crossfade fadein_part = buffer[:crossfade_size] remaining_bytes = buffer[crossfade_size:] @@ -690,14 +681,6 @@ class StreamsController(CoreController): bytes_written += len(last_fadeout_part) last_fadeout_part = b"" if use_crossfade: - # strip silence from last part - buffer = await strip_silence( - self.mass, - buffer, - sample_rate=pcm_format.sample_rate, - bit_depth=pcm_format.bit_depth, - reverse=True, - ) # if crossfade is enabled, save fadeout part to pickup for next track last_fadeout_part = buffer[-crossfade_size:] remaining_bytes = buffer[:-crossfade_size] @@ -772,7 +755,6 @@ class StreamsController(CoreController): pcm_format: AudioFormat, ) -> AsyncGenerator[tuple[bool, bytes], None]: """Get the audio stream for the given streamdetails as raw pcm chunks.""" - logger = self.logger.getChild("media_stream") is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration if is_radio: streamdetails.seek_position = 0 @@ -810,8 +792,6 @@ class StreamsController(CoreController): ) filter_rule += ":print_format=json" filter_params.append(filter_rule) - if streamdetails.fade_in: - filter_params.append("afade=type=in:start_time=0:duration=3") if streamdetails.stream_type == StreamType.CUSTOM: audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( streamdetails, @@ -833,8 +813,14 @@ class StreamsController(CoreController): extra_input_args += ["-decryption_key", streamdetails.decryption_key] else: audio_source = streamdetails.path - if streamdetails.seek_position: - extra_input_args += ["-ss", str(int(streamdetails.seek_position))] + + # handle seek support + if ( + streamdetails.seek_position + and streamdetails.media_type != MediaType.RADIO + and streamdetails.stream_type != StreamType.CUSTOM + ): + extra_input_args += ["-ss", str(int(streamdetails.seek_position))] if streamdetails.media_type == MediaType.RADIO: # pad some silence before the radio stream starts to create some headroom @@ -843,73 +829,15 @@ class StreamsController(CoreController): async for chunk in get_silence(2, pcm_format): yield chunk - logger.debug("start media stream for: %s", streamdetails.uri) - bytes_sent = 0 - finished = False - try: - async with FFMpeg( - audio_input=audio_source, - input_format=streamdetails.audio_format, - output_format=pcm_format, - filter_params=filter_params, - extra_input_args=extra_input_args, - collect_log_history=True, - logger=logger, - ) as ffmpeg_proc: - async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size): - bytes_sent += len(chunk) - yield chunk - # del chunk - finished = True - finally: - if "ffmpeg_proc" not in locals(): - # edge case: ffmpeg process was not yet started - return # noqa: B012 - if finished and not ffmpeg_proc.closed: - await asyncio.wait_for(ffmpeg_proc.wait(), 60) - elif not ffmpeg_proc.closed: - await ffmpeg_proc.close() - - # try to determine how many seconds we've streamed - seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 - logger.debug( - "stream %s (with code %s) for %s - seconds streamed: %s", - "finished" if finished else "aborted", - ffmpeg_proc.returncode, - streamdetails.uri, - seconds_streamed, - ) - streamdetails.seconds_streamed = seconds_streamed - # store accurate duration - if finished and not streamdetails.seek_position and seconds_streamed: - streamdetails.duration = seconds_streamed - - # parse loudnorm data if we have that collected - if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)): - required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 - if finished or (seconds_streamed >= required_seconds): - logger.debug( - "Loudness measurement for %s: %s", - streamdetails.uri, - loudness_details, - ) - streamdetails.loudness = loudness_details - self.mass.create_task( - self.mass.music.set_track_loudness( - streamdetails.item_id, streamdetails.provider, loudness_details - ) - ) - # report playback - if finished or seconds_streamed > 30: - self.mass.create_task( - self.mass.music.mark_item_played( - streamdetails.media_type, - streamdetails.item_id, - streamdetails.provider, - ) - ) - if music_prov := self.mass.get_provider(streamdetails.provider): - self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) + async for chunk in get_media_stream( + self.mass, + streamdetails=streamdetails, + pcm_format=pcm_format, + audio_source=audio_source, + filter_params=filter_params, + extra_input_args=extra_input_args, + ): + yield chunk def _log_request(self, request: web.Request) -> None: """Log request.""" diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index da4c3421..62099ff6 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -194,11 +194,16 @@ class FFMpeg(AsyncProcess): # write EOF once we've reached the end of the input stream await self.write_eof() except Exception as err: + if isinstance(err, asyncio.CancelledError): + return # make sure we dont swallow any exceptions and we bail out # once our audio source fails. - if not isinstance(err, asyncio.CancelledError): - self.logger.exception(err) - await self.close(True) + self.logger.error( + "Stream error: %s", + str(err), + exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None, + ) + await self.write_eof() async def crossfade_pcm_parts( @@ -413,6 +418,149 @@ async def get_stream_details( return streamdetails +async def get_media_stream( + mass: MusicAssistant, + streamdetails: StreamDetails, + pcm_format: AudioFormat, + audio_source: AsyncGenerator[bytes, None] | str, + filter_params: list[str] | None = None, + extra_input_args: list[str] | None = None, +) -> AsyncGenerator[bytes, None]: + """Get PCM audio stream for given media details.""" + logger = LOGGER.getChild("media_stream") + logger.debug("start media stream for: %s", streamdetails.uri) + strip_silence_begin = streamdetails.strip_silence_begin + strip_silence_end = streamdetails.strip_silence_end + if streamdetails.fade_in: + filter_params.append("afade=type=in:start_time=0:duration=3") + strip_silence_begin = False + bytes_sent = 0 + chunk_number = 0 + buffer: bytes = b"" + finished = False + try: + async with FFMpeg( + audio_input=audio_source, + input_format=streamdetails.audio_format, + output_format=pcm_format, + filter_params=filter_params, + extra_input_args=extra_input_args, + collect_log_history=True, + logger=logger, + ) as ffmpeg_proc: + async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size): + chunk_number += 1 + # determine buffer size dynamically + if chunk_number < 5 and strip_silence_begin: + req_buffer_size = int(pcm_format.pcm_sample_size * 4) + elif chunk_number > 30 and strip_silence_end: + req_buffer_size = int(pcm_format.pcm_sample_size * 8) + else: + req_buffer_size = int(pcm_format.pcm_sample_size * 2) + + # always append to buffer + buffer += chunk + del chunk + + if len(buffer) < req_buffer_size: + # buffer is not full enough, move on + continue + + if chunk_number == 5 and strip_silence_begin: + # strip silence from begin of audio + chunk = await strip_silence( # noqa: PLW2901 + mass, buffer, pcm_format.sample_rate, pcm_format.bit_depth + ) + bytes_sent += len(chunk) + yield chunk + buffer = b"" + continue + + #### OTHER: enough data in buffer, feed to output + while len(buffer) > req_buffer_size: + yield buffer[: pcm_format.pcm_sample_size] + bytes_sent += pcm_format.pcm_sample_size + buffer = buffer[pcm_format.pcm_sample_size :] + + # end of audio/track reached + if strip_silence_end and buffer: + # strip silence from end of audio + buffer = await strip_silence( + mass, + buffer, + sample_rate=pcm_format.sample_rate, + bit_depth=pcm_format.bit_depth, + reverse=True, + ) + # send remaining bytes in buffer + bytes_sent += len(buffer) + yield buffer + del buffer + + if bytes_sent == 0: + # edge case: no audio data was sent + streamdetails.stream_error = True + finished = False + logger.warning("Stream error on %s", streamdetails.uri) + # we send a bit of silence so players get at least some data + # without it, some players refuse to skip to the next track + async for chunk in get_silence(6, pcm_format): + yield chunk + bytes_sent += len(chunk) + else: + finished = True + finally: + if "ffmpeg_proc" not in locals(): + # edge case: ffmpeg process was not yet started + return # noqa: B012 + if finished and not ffmpeg_proc.closed: + await asyncio.wait_for(ffmpeg_proc.wait(), 60) + elif not ffmpeg_proc.closed: + await ffmpeg_proc.close() + + # try to determine how many seconds we've streamed + seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 + logger.debug( + "stream %s (with code %s) for %s - seconds streamed: %s", + "finished" if finished else "aborted", + ffmpeg_proc.returncode, + streamdetails.uri, + seconds_streamed, + ) + + streamdetails.seconds_streamed = seconds_streamed + # store accurate duration + if finished and not streamdetails.seek_position and seconds_streamed: + streamdetails.duration = seconds_streamed + + # parse loudnorm data if we have that collected + if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)): + required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 + if finished or (seconds_streamed >= required_seconds): + logger.debug( + "Loudness measurement for %s: %s", + streamdetails.uri, + loudness_details, + ) + streamdetails.loudness = loudness_details + mass.create_task( + mass.music.set_track_loudness( + streamdetails.item_id, streamdetails.provider, loudness_details + ) + ) + # report playback + if finished or seconds_streamed > 30: + mass.create_task( + mass.music.mark_item_played( + streamdetails.media_type, + streamdetails.item_id, + streamdetails.provider, + ) + ) + if music_prov := mass.get_provider(streamdetails.provider): + mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) + + def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None): """Generate a wave header from given params.""" # pylint: disable=no-member diff --git a/music_assistant/server/helpers/util.py b/music_assistant/server/helpers/util.py index 58505daa..959b599e 100644 --- a/music_assistant/server/helpers/util.py +++ b/music_assistant/server/helpers/util.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import functools import importlib import logging import platform @@ -10,12 +11,12 @@ import tempfile import urllib.error import urllib.parse import urllib.request -from collections.abc import Coroutine +from collections.abc import Awaitable, Callable, Coroutine from functools import lru_cache from importlib.metadata import PackageNotFoundError from importlib.metadata import version as pkg_version from types import TracebackType -from typing import TYPE_CHECKING, Self +from typing import TYPE_CHECKING, Any, ParamSpec, Self, TypeVar import ifaddr import memory_tempfile @@ -198,3 +199,24 @@ class TaskManager: if len(self._tasks) > 0: await asyncio.wait(self._tasks) self._tasks.clear() + + +_R = TypeVar("_R") +_P = ParamSpec("_P") + + +def lock( + func: Callable[_P, Awaitable[_R]], +) -> Callable[_P, Coroutine[Any, Any, _R]]: + """Call async function using a Lock.""" + + @functools.wraps(func) + async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R: + """Call async function using the throttler with retries.""" + if not (func_lock := getattr(func, "lock", None)): + func_lock = asyncio.Lock() + func.lock = func_lock + async with func_lock: + return await func(*args, **kwargs) + + return wrapper diff --git a/music_assistant/server/providers/qobuz/__init__.py b/music_assistant/server/providers/qobuz/__init__.py index 1ca34487..59f0746e 100644 --- a/music_assistant/server/providers/qobuz/__init__.py +++ b/music_assistant/server/providers/qobuz/__init__.py @@ -53,6 +53,7 @@ from music_assistant.server.helpers.app_vars import app_var # pylint: enable=no-name-in-module from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries +from music_assistant.server.helpers.util import lock from music_assistant.server.models.music_provider import MusicProvider if TYPE_CHECKING: @@ -699,6 +700,7 @@ class QobuzProvider(MusicProvider): playlist.cache_checksum = str(playlist_obj["updated_at"]) return playlist + @lock async def _auth_token(self): """Login to qobuz and store the token.""" if self._user_auth_info: diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 8b0914a1..0bd10b5e 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -20,7 +20,6 @@ from music_assistant.common.models.enums import ( StreamType, ) from music_assistant.common.models.errors import ( - AudioError, LoginFailed, MediaNotFoundError, ResourceTemporarilyUnavailable, @@ -52,6 +51,7 @@ from music_assistant.server.helpers.audio import get_chunksize from music_assistant.server.helpers.auth import AuthenticationHelper from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries +from music_assistant.server.helpers.util import lock from music_assistant.server.models.music_provider import MusicProvider if TYPE_CHECKING: @@ -244,12 +244,11 @@ class SpotifyProvider(MusicProvider): _auth_info: str | None = None _sp_user: dict[str, Any] | None = None _librespot_bin: str | None = None - # rate limiter needs to be specified on provider-level, - # so make it an instance attribute - throttler = ThrottlerManager(rate_limit=1, period=2) + throttler: ThrottlerManager async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" + self.throttler = ThrottlerManager(rate_limit=1, period=2) if self.config.get_value(CONF_CLIENT_ID): # loosen the throttler a bit when a custom client id is used self.throttler.rate_limit = 45 @@ -542,13 +541,15 @@ class SpotifyProvider(MusicProvider): items = await self._get_data(endpoint, seed_tracks=prov_track_id, limit=limit) return [self._parse_track(item) for item in items["tracks"] if (item and item["id"])] - @throttle_with_retries async def get_stream_details(self, item_id: str) -> StreamDetails: """Return the content details for the given track when it will be streamed.""" - # make sure that the token is still valid by just requesting it - await self.login() + # fetch full track details + # this will also check if the track is available for streaming + # and use spotify's track linking feature to serve a substitute track + # if the original track is not available + track = await self.get_track(item_id) return StreamDetails( - item_id=item_id, + item_id=track.item_id, provider=self.instance_id, audio_format=AudioFormat( content_type=ContentType.OGG, @@ -560,49 +561,38 @@ class SpotifyProvider(MusicProvider): self, streamdetails: StreamDetails, seek_position: int = 0 ) -> AsyncGenerator[bytes, None]: """Return the audio stream for the provider item.""" - auth_info = await self.login() + auth_info = await self.login(force_fresh=True) librespot = await self.get_librespot_binary() spotify_uri = f"spotify://track:{streamdetails.item_id}" - for retry in (True, False): - args = [ - librespot, - "-c", - CACHE_DIR, - "-M", - "256M", - "--passthrough", - "-b", - "320", - "--backend", - "pipe", - "--single-track", - spotify_uri, - "--token", - auth_info["access_token"], - ] - if seek_position: - args += ["--start-position", str(int(seek_position))] - chunk_size = get_chunksize(streamdetails.audio_format) - stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False - self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot") - - async with AsyncProcess( - args, - stdout=True, - stderr=stderr, - name="librespot", - ) as librespot_proc: - async for chunk in librespot_proc.iter_any(chunk_size): - yield chunk - if librespot_proc.returncode == 0: - self.logger.log(VERBOSE_LOG_LEVEL, f"Streaming {spotify_uri} ready.") - break - if not retry: - raise AudioError( - f"Failed to stream {spotify_uri} - error: {librespot_proc.returncode}" - ) - # do one retry attempt - accounting for the fact that the token might have expired - auth_info = await self.login(force_refresh=True) + args = [ + librespot, + "-c", + CACHE_DIR, + "-M", + "256M", + "--passthrough", + "-b", + "320", + "--backend", + "pipe", + "--single-track", + spotify_uri, + "--token", + auth_info["access_token"], + ] + if seek_position: + args += ["--start-position", str(int(seek_position))] + chunk_size = get_chunksize(streamdetails.audio_format) + stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False + self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot") + async with AsyncProcess( + args, + stdout=True, + stderr=stderr, + name="librespot", + ) as librespot_proc: + async for chunk in librespot_proc.iter_any(chunk_size): + yield chunk def _parse_artist(self, artist_obj): """Parse spotify artist object to generic layout.""" @@ -784,11 +774,12 @@ class SpotifyProvider(MusicProvider): playlist.cache_checksum = str(playlist_obj["snapshot_id"]) return playlist - async def login(self, retry: bool = True, force_refresh: bool = False) -> dict: + @lock + async def login(self, force_fresh: bool = False) -> dict: """Log-in Spotify and return Auth/token info.""" # return existing token if we have one in memory if self._auth_info and ( - self._auth_info["expires_at"] > (time.time() - 1800 if force_refresh else 120) + self._auth_info["expires_at"] > (time.time() - 1800 if force_fresh else 300) ): return self._auth_info # request new access token using the refresh token @@ -801,23 +792,28 @@ class SpotifyProvider(MusicProvider): "refresh_token": refresh_token, "client_id": client_id, } - async with self.mass.http_session.post( - "https://accounts.spotify.com/api/token", data=params - ) as response: - if response.status != 200: - err = await response.text() - if "revoked" in err: - # clear refresh token if it's invalid - self.mass.config.set_raw_provider_config_value( - self.instance_id, CONF_REFRESH_TOKEN, "" - ) - if retry: - await asyncio.sleep(1) - return await self.login(retry=False) - raise LoginFailed(f"Failed to refresh access token: {err}") - auth_info = await response.json() - auth_info["expires_at"] = int(auth_info["expires_in"] + time.time()) - self.logger.debug("Successfully refreshed access token") + for _ in range(2): + async with self.mass.http_session.post( + "https://accounts.spotify.com/api/token", data=params + ) as response: + if response.status != 200: + err = await response.text() + if "revoked" in err: + # clear refresh token if it's invalid + self.mass.config.set_raw_provider_config_value( + self.instance_id, CONF_REFRESH_TOKEN, "" + ) + raise LoginFailed(f"Failed to refresh access token: {err}") + # the token failed to refresh, we allow one retry + await asyncio.sleep(2) + continue + # if we reached this point, the token has been successfully refreshed + auth_info = await response.json() + auth_info["expires_at"] = int(auth_info["expires_in"] + time.time()) + self.logger.debug("Successfully refreshed access token") + break + else: + raise LoginFailed(f"Failed to refresh access token: {err}") # make sure that our updated creds get stored in memory + config self._auth_info = auth_info @@ -854,7 +850,8 @@ class SpotifyProvider(MusicProvider): url = f"https://api.spotify.com/v1/{endpoint}" kwargs["market"] = "from_token" kwargs["country"] = "from_token" - auth_info = kwargs.pop("auth_info", await self.login()) + if not (auth_info := kwargs.pop("auth_info", None)): + auth_info = await self.login() headers = {"Authorization": f'Bearer {auth_info["access_token"]}'} locale = self.mass.metadata.locale.replace("_", "-") language = locale.split("-")[0] -- 2.34.1