Fix playback get stuck when there is a stream error on a single track (#1651)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 8 Sep 2024 09:48:10 +0000 (11:48 +0200)
committerGitHub <noreply@github.com>
Sun, 8 Sep 2024 09:48:10 +0000 (11:48 +0200)
music_assistant/common/models/streamdetails.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/util.py
music_assistant/server/providers/qobuz/__init__.py
music_assistant/server/providers/spotify/__init__.py

index f8caebb424dda6d004404ec7404d65433b106ae0..c80a1f67bc32902a50350123e288a61365edd6ef 100644 (file)
@@ -60,6 +60,9 @@ class StreamDetails(DataClassDictMixin):
     seconds_streamed: float | None = None
     target_loudness: float | None = None
     bypass_loudness_normalization: bool = False
+    strip_silence_begin: bool = False
+    strip_silence_end: bool = False
+    stream_error: bool | None = None
 
     def __str__(self) -> str:
         """Return pretty printable string of object."""
index 0fe92f16f441249d13e6ec5c2f1a3b67e131f33d..9fdc0407a0b333022170515ba8fb197ce932b2c0 100644 (file)
@@ -35,7 +35,12 @@ from music_assistant.common.models.player import PlayerMedia
 from music_assistant.common.models.player_queue import PlayerQueue
 from music_assistant.common.models.queue_item import QueueItem
 from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, MASS_LOGO_ONLINE
+from music_assistant.constants import (
+    CONF_CROSSFADE,
+    CONF_FLOW_MODE,
+    FALLBACK_DURATION,
+    MASS_LOGO_ONLINE,
+)
 from music_assistant.server.helpers.api import api_command
 from music_assistant.server.helpers.audio import get_stream_details
 from music_assistant.server.models.core_controller import CoreController
@@ -767,6 +772,10 @@ class PlayerQueuesController(CoreController):
         queue_item.streamdetails = await get_stream_details(
             self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
         )
+        # allow stripping silence from the end of the track if crossfade is enabled
+        # this will allow for smoother crossfades
+        if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
+            queue_item.streamdetails.strip_silence_end = True
         # send play_media request to player
         # NOTE that we debounce this a bit to account for someone hitting the next button
         # like a madman. This will prevent the player from being overloaded with requests.
@@ -1029,6 +1038,11 @@ class PlayerQueuesController(CoreController):
                 # maximum quality of thumbs
                 if queue_item.media_item:
                     queue_item.media_item = await self.mass.music.get_item_by_uri(queue_item.uri)
+                # allow stripping silence from the begin/end of the track if crossfade is enabled
+                # this will allow for (much) smoother crossfades
+                if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
+                    queue_item.streamdetails.strip_silence_end = True
+                    queue_item.streamdetails.strip_silence_begin = True
                 # we're all set, this is our next item
                 next_item = queue_item
                 break
index 39d51b3e898dc42eeb00b9859a658c8764d54a6f..c505ee14c868ee674fbdae633e69243d9a23d00f 100644 (file)
@@ -8,7 +8,6 @@ the upnp callbacks and json rpc api for slimproto clients.
 
 from __future__ import annotations
 
-import asyncio
 import os
 import time
 import urllib.parse
@@ -47,7 +46,6 @@ from music_assistant.constants import (
 )
 from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
-    FFMpeg,
     check_audio_support,
     crossfade_pcm_parts,
     get_chunksize,
@@ -55,11 +53,10 @@ from music_assistant.server.helpers.audio import (
     get_hls_radio_stream,
     get_hls_substream,
     get_icy_radio_stream,
+    get_media_stream,
     get_player_filter_params,
     get_silence,
     get_stream_details,
-    parse_loudnorm,
-    strip_silence,
 )
 from music_assistant.server.helpers.util import get_ips
 from music_assistant.server.helpers.webserver import Webserver
@@ -336,7 +333,8 @@ class StreamsController(CoreController):
 
         # all checks passed, start streaming!
         self.logger.debug(
-            "Start serving audio stream for QueueItem %s to %s",
+            "Start serving audio stream for QueueItem %s (%s) to %s",
+            queue_item.name,
             queue_item.uri,
             queue.display_name,
         )
@@ -347,6 +345,7 @@ class StreamsController(CoreController):
             bit_depth=queue_item.streamdetails.audio_format.bit_depth,
             channels=2,
         )
+        chunk_num = 0
         async for chunk in get_ffmpeg_stream(
             audio_input=self.get_media_stream(
                 streamdetails=queue_item.streamdetails,
@@ -358,8 +357,16 @@ class StreamsController(CoreController):
         ):
             try:
                 await resp.write(chunk)
+                chunk_num += 1
             except (BrokenPipeError, ConnectionResetError, ConnectionError):
                 break
+        if queue_item.streamdetails.stream_error:
+            self.logger.error(
+                "Error streaming QueueItem %s (%s) to %s",
+                queue_item.name,
+                queue_item.uri,
+                queue.display_name,
+            )
         if queue.stream_finished is not None:
             queue.stream_finished = True
         return resp
@@ -586,7 +593,6 @@ class StreamsController(CoreController):
             use_crossfade,
         )
         total_bytes_sent = 0
-        started = time.time()
 
         while True:
             # get (next) queue item to stream
@@ -629,14 +635,7 @@ class StreamsController(CoreController):
                 pcm_format=pcm_format,
             ):
                 # buffer size needs to be big enough to include the crossfade part
-                # allow it to be a bit smaller when playback just starts
-                if not use_crossfade:
-                    req_buffer_size = pcm_sample_size * 2
-                elif (time.time() - started) > 120:
-                    # additional 5 seconds to strip silence from last part
-                    req_buffer_size = crossfade_size + pcm_sample_size * 5
-                else:
-                    req_buffer_size = crossfade_size
+                req_buffer_size = pcm_sample_size * 2 if not use_crossfade else crossfade_size
 
                 # ALWAYS APPEND CHUNK TO BUFFER
                 buffer += chunk
@@ -647,14 +646,6 @@ class StreamsController(CoreController):
 
                 ####  HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
                 if last_fadeout_part:
-                    # strip silence from last part
-                    buffer = await strip_silence(
-                        self.mass,
-                        buffer,
-                        sample_rate=pcm_format.sample_rate,
-                        bit_depth=pcm_format.bit_depth,
-                        reverse=False,
-                    )
                     # perform crossfade
                     fadein_part = buffer[:crossfade_size]
                     remaining_bytes = buffer[crossfade_size:]
@@ -690,14 +681,6 @@ class StreamsController(CoreController):
                 bytes_written += len(last_fadeout_part)
                 last_fadeout_part = b""
             if use_crossfade:
-                # strip silence from last part
-                buffer = await strip_silence(
-                    self.mass,
-                    buffer,
-                    sample_rate=pcm_format.sample_rate,
-                    bit_depth=pcm_format.bit_depth,
-                    reverse=True,
-                )
                 # if crossfade is enabled, save fadeout part to pickup for next track
                 last_fadeout_part = buffer[-crossfade_size:]
                 remaining_bytes = buffer[:-crossfade_size]
@@ -772,7 +755,6 @@ class StreamsController(CoreController):
         pcm_format: AudioFormat,
     ) -> AsyncGenerator[tuple[bool, bytes], None]:
         """Get the audio stream for the given streamdetails as raw pcm chunks."""
-        logger = self.logger.getChild("media_stream")
         is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
         if is_radio:
             streamdetails.seek_position = 0
@@ -810,8 +792,6 @@ class StreamsController(CoreController):
                 )
             filter_rule += ":print_format=json"
             filter_params.append(filter_rule)
-        if streamdetails.fade_in:
-            filter_params.append("afade=type=in:start_time=0:duration=3")
         if streamdetails.stream_type == StreamType.CUSTOM:
             audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
                 streamdetails,
@@ -833,8 +813,14 @@ class StreamsController(CoreController):
             extra_input_args += ["-decryption_key", streamdetails.decryption_key]
         else:
             audio_source = streamdetails.path
-            if streamdetails.seek_position:
-                extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
+
+        # handle seek support
+        if (
+            streamdetails.seek_position
+            and streamdetails.media_type != MediaType.RADIO
+            and streamdetails.stream_type != StreamType.CUSTOM
+        ):
+            extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
 
         if streamdetails.media_type == MediaType.RADIO:
             # pad some silence before the radio stream starts to create some headroom
@@ -843,73 +829,15 @@ class StreamsController(CoreController):
             async for chunk in get_silence(2, pcm_format):
                 yield chunk
 
-        logger.debug("start media stream for: %s", streamdetails.uri)
-        bytes_sent = 0
-        finished = False
-        try:
-            async with FFMpeg(
-                audio_input=audio_source,
-                input_format=streamdetails.audio_format,
-                output_format=pcm_format,
-                filter_params=filter_params,
-                extra_input_args=extra_input_args,
-                collect_log_history=True,
-                logger=logger,
-            ) as ffmpeg_proc:
-                async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
-                    bytes_sent += len(chunk)
-                    yield chunk
-                    # del chunk
-                finished = True
-        finally:
-            if "ffmpeg_proc" not in locals():
-                # edge case: ffmpeg process was not yet started
-                return  # noqa: B012
-            if finished and not ffmpeg_proc.closed:
-                await asyncio.wait_for(ffmpeg_proc.wait(), 60)
-            elif not ffmpeg_proc.closed:
-                await ffmpeg_proc.close()
-
-            # try to determine how many seconds we've streamed
-            seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
-            logger.debug(
-                "stream %s (with code %s) for %s - seconds streamed: %s",
-                "finished" if finished else "aborted",
-                ffmpeg_proc.returncode,
-                streamdetails.uri,
-                seconds_streamed,
-            )
-            streamdetails.seconds_streamed = seconds_streamed
-            # store accurate duration
-            if finished and not streamdetails.seek_position and seconds_streamed:
-                streamdetails.duration = seconds_streamed
-
-            # parse loudnorm data if we have that collected
-            if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
-                required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
-                if finished or (seconds_streamed >= required_seconds):
-                    logger.debug(
-                        "Loudness measurement for %s: %s",
-                        streamdetails.uri,
-                        loudness_details,
-                    )
-                    streamdetails.loudness = loudness_details
-                    self.mass.create_task(
-                        self.mass.music.set_track_loudness(
-                            streamdetails.item_id, streamdetails.provider, loudness_details
-                        )
-                    )
-            # report playback
-            if finished or seconds_streamed > 30:
-                self.mass.create_task(
-                    self.mass.music.mark_item_played(
-                        streamdetails.media_type,
-                        streamdetails.item_id,
-                        streamdetails.provider,
-                    )
-                )
-                if music_prov := self.mass.get_provider(streamdetails.provider):
-                    self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+        async for chunk in get_media_stream(
+            self.mass,
+            streamdetails=streamdetails,
+            pcm_format=pcm_format,
+            audio_source=audio_source,
+            filter_params=filter_params,
+            extra_input_args=extra_input_args,
+        ):
+            yield chunk
 
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
index da4c342198f9d5abafb94a75e63de497b278d2a1..62099ff6ea409a66bdbedcf8daa436d92ea036a4 100644 (file)
@@ -194,11 +194,16 @@ class FFMpeg(AsyncProcess):
             # write EOF once we've reached the end of the input stream
             await self.write_eof()
         except Exception as err:
+            if isinstance(err, asyncio.CancelledError):
+                return
             # make sure we dont swallow any exceptions and we bail out
             # once our audio source fails.
-            if not isinstance(err, asyncio.CancelledError):
-                self.logger.exception(err)
-                await self.close(True)
+            self.logger.error(
+                "Stream error: %s",
+                str(err),
+                exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
+            )
+            await self.write_eof()
 
 
 async def crossfade_pcm_parts(
@@ -413,6 +418,149 @@ async def get_stream_details(
     return streamdetails
 
 
+async def get_media_stream(
+    mass: MusicAssistant,
+    streamdetails: StreamDetails,
+    pcm_format: AudioFormat,
+    audio_source: AsyncGenerator[bytes, None] | str,
+    filter_params: list[str] | None = None,
+    extra_input_args: list[str] | None = None,
+) -> AsyncGenerator[bytes, None]:
+    """Get PCM audio stream for given media details."""
+    logger = LOGGER.getChild("media_stream")
+    logger.debug("start media stream for: %s", streamdetails.uri)
+    strip_silence_begin = streamdetails.strip_silence_begin
+    strip_silence_end = streamdetails.strip_silence_end
+    if streamdetails.fade_in:
+        filter_params.append("afade=type=in:start_time=0:duration=3")
+        strip_silence_begin = False
+    bytes_sent = 0
+    chunk_number = 0
+    buffer: bytes = b""
+    finished = False
+    try:
+        async with FFMpeg(
+            audio_input=audio_source,
+            input_format=streamdetails.audio_format,
+            output_format=pcm_format,
+            filter_params=filter_params,
+            extra_input_args=extra_input_args,
+            collect_log_history=True,
+            logger=logger,
+        ) as ffmpeg_proc:
+            async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
+                chunk_number += 1
+                # determine buffer size dynamically
+                if chunk_number < 5 and strip_silence_begin:
+                    req_buffer_size = int(pcm_format.pcm_sample_size * 4)
+                elif chunk_number > 30 and strip_silence_end:
+                    req_buffer_size = int(pcm_format.pcm_sample_size * 8)
+                else:
+                    req_buffer_size = int(pcm_format.pcm_sample_size * 2)
+
+                # always append to buffer
+                buffer += chunk
+                del chunk
+
+                if len(buffer) < req_buffer_size:
+                    # buffer is not full enough, move on
+                    continue
+
+                if chunk_number == 5 and strip_silence_begin:
+                    # strip silence from begin of audio
+                    chunk = await strip_silence(  # noqa: PLW2901
+                        mass, buffer, pcm_format.sample_rate, pcm_format.bit_depth
+                    )
+                    bytes_sent += len(chunk)
+                    yield chunk
+                    buffer = b""
+                    continue
+
+                #### OTHER: enough data in buffer, feed to output
+                while len(buffer) > req_buffer_size:
+                    yield buffer[: pcm_format.pcm_sample_size]
+                    bytes_sent += pcm_format.pcm_sample_size
+                    buffer = buffer[pcm_format.pcm_sample_size :]
+
+            # end of audio/track reached
+            if strip_silence_end and buffer:
+                # strip silence from end of audio
+                buffer = await strip_silence(
+                    mass,
+                    buffer,
+                    sample_rate=pcm_format.sample_rate,
+                    bit_depth=pcm_format.bit_depth,
+                    reverse=True,
+                )
+            # send remaining bytes in buffer
+            bytes_sent += len(buffer)
+            yield buffer
+            del buffer
+
+            if bytes_sent == 0:
+                # edge case: no audio data was sent
+                streamdetails.stream_error = True
+                finished = False
+                logger.warning("Stream error on %s", streamdetails.uri)
+                # we send a bit of silence so players get at least some data
+                # without it, some players refuse to skip to the next track
+                async for chunk in get_silence(6, pcm_format):
+                    yield chunk
+                    bytes_sent += len(chunk)
+            else:
+                finished = True
+    finally:
+        if "ffmpeg_proc" not in locals():
+            # edge case: ffmpeg process was not yet started
+            return  # noqa: B012
+        if finished and not ffmpeg_proc.closed:
+            await asyncio.wait_for(ffmpeg_proc.wait(), 60)
+        elif not ffmpeg_proc.closed:
+            await ffmpeg_proc.close()
+
+        # try to determine how many seconds we've streamed
+        seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+        logger.debug(
+            "stream %s (with code %s) for %s - seconds streamed: %s",
+            "finished" if finished else "aborted",
+            ffmpeg_proc.returncode,
+            streamdetails.uri,
+            seconds_streamed,
+        )
+
+        streamdetails.seconds_streamed = seconds_streamed
+        # store accurate duration
+        if finished and not streamdetails.seek_position and seconds_streamed:
+            streamdetails.duration = seconds_streamed
+
+        # parse loudnorm data if we have that collected
+        if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
+            required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+            if finished or (seconds_streamed >= required_seconds):
+                logger.debug(
+                    "Loudness measurement for %s: %s",
+                    streamdetails.uri,
+                    loudness_details,
+                )
+                streamdetails.loudness = loudness_details
+                mass.create_task(
+                    mass.music.set_track_loudness(
+                        streamdetails.item_id, streamdetails.provider, loudness_details
+                    )
+                )
+        # report playback
+        if finished or seconds_streamed > 30:
+            mass.create_task(
+                mass.music.mark_item_played(
+                    streamdetails.media_type,
+                    streamdetails.item_id,
+                    streamdetails.provider,
+                )
+            )
+            if music_prov := mass.get_provider(streamdetails.provider):
+                mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+
+
 def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None):
     """Generate a wave header from given params."""
     # pylint: disable=no-member
index 58505daae9375b461ce8128ed24d2ba48af32609..959b599eb56b13b1c2bc6a41752ed5b85abda331 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import annotations
 
 import asyncio
+import functools
 import importlib
 import logging
 import platform
@@ -10,12 +11,12 @@ import tempfile
 import urllib.error
 import urllib.parse
 import urllib.request
-from collections.abc import Coroutine
+from collections.abc import Awaitable, Callable, Coroutine
 from functools import lru_cache
 from importlib.metadata import PackageNotFoundError
 from importlib.metadata import version as pkg_version
 from types import TracebackType
-from typing import TYPE_CHECKING, Self
+from typing import TYPE_CHECKING, Any, ParamSpec, Self, TypeVar
 
 import ifaddr
 import memory_tempfile
@@ -198,3 +199,24 @@ class TaskManager:
         if len(self._tasks) > 0:
             await asyncio.wait(self._tasks)
             self._tasks.clear()
+
+
+_R = TypeVar("_R")
+_P = ParamSpec("_P")
+
+
+def lock(
+    func: Callable[_P, Awaitable[_R]],
+) -> Callable[_P, Coroutine[Any, Any, _R]]:
+    """Call async function using a Lock."""
+
+    @functools.wraps(func)
+    async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
+        """Call async function using the throttler with retries."""
+        if not (func_lock := getattr(func, "lock", None)):
+            func_lock = asyncio.Lock()
+            func.lock = func_lock
+        async with func_lock:
+            return await func(*args, **kwargs)
+
+    return wrapper
index 1ca3448703583932b1d2c4a9baa60bc2cffef374..59f0746e12468de9d8151c574887a34d5f09578e 100644 (file)
@@ -53,6 +53,7 @@ from music_assistant.server.helpers.app_vars import app_var
 
 # pylint: enable=no-name-in-module
 from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
+from music_assistant.server.helpers.util import lock
 from music_assistant.server.models.music_provider import MusicProvider
 
 if TYPE_CHECKING:
@@ -699,6 +700,7 @@ class QobuzProvider(MusicProvider):
         playlist.cache_checksum = str(playlist_obj["updated_at"])
         return playlist
 
+    @lock
     async def _auth_token(self):
         """Login to qobuz and store the token."""
         if self._user_auth_info:
index 8b0914a12e340eec91803678b3b3ccc82543e1e7..0bd10b5e902bfe6dea9ac46c0a7f362d3a25e535 100644 (file)
@@ -20,7 +20,6 @@ from music_assistant.common.models.enums import (
     StreamType,
 )
 from music_assistant.common.models.errors import (
-    AudioError,
     LoginFailed,
     MediaNotFoundError,
     ResourceTemporarilyUnavailable,
@@ -52,6 +51,7 @@ from music_assistant.server.helpers.audio import get_chunksize
 from music_assistant.server.helpers.auth import AuthenticationHelper
 from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
+from music_assistant.server.helpers.util import lock
 from music_assistant.server.models.music_provider import MusicProvider
 
 if TYPE_CHECKING:
@@ -244,12 +244,11 @@ class SpotifyProvider(MusicProvider):
     _auth_info: str | None = None
     _sp_user: dict[str, Any] | None = None
     _librespot_bin: str | None = None
-    # rate limiter needs to be specified on provider-level,
-    # so make it an instance attribute
-    throttler = ThrottlerManager(rate_limit=1, period=2)
+    throttler: ThrottlerManager
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
+        self.throttler = ThrottlerManager(rate_limit=1, period=2)
         if self.config.get_value(CONF_CLIENT_ID):
             # loosen the throttler a bit when a custom client id is used
             self.throttler.rate_limit = 45
@@ -542,13 +541,15 @@ class SpotifyProvider(MusicProvider):
         items = await self._get_data(endpoint, seed_tracks=prov_track_id, limit=limit)
         return [self._parse_track(item) for item in items["tracks"] if (item and item["id"])]
 
-    @throttle_with_retries
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
-        # make sure that the token is still valid by just requesting it
-        await self.login()
+        # fetch full track details
+        # this will also check if the track is available for streaming
+        # and use spotify's track linking feature to serve a substitute track
+        # if the original track is not available
+        track = await self.get_track(item_id)
         return StreamDetails(
-            item_id=item_id,
+            item_id=track.item_id,
             provider=self.instance_id,
             audio_format=AudioFormat(
                 content_type=ContentType.OGG,
@@ -560,49 +561,38 @@ class SpotifyProvider(MusicProvider):
         self, streamdetails: StreamDetails, seek_position: int = 0
     ) -> AsyncGenerator[bytes, None]:
         """Return the audio stream for the provider item."""
-        auth_info = await self.login()
+        auth_info = await self.login(force_fresh=True)
         librespot = await self.get_librespot_binary()
         spotify_uri = f"spotify://track:{streamdetails.item_id}"
-        for retry in (True, False):
-            args = [
-                librespot,
-                "-c",
-                CACHE_DIR,
-                "-M",
-                "256M",
-                "--passthrough",
-                "-b",
-                "320",
-                "--backend",
-                "pipe",
-                "--single-track",
-                spotify_uri,
-                "--token",
-                auth_info["access_token"],
-            ]
-            if seek_position:
-                args += ["--start-position", str(int(seek_position))]
-            chunk_size = get_chunksize(streamdetails.audio_format)
-            stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
-            self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
-
-            async with AsyncProcess(
-                args,
-                stdout=True,
-                stderr=stderr,
-                name="librespot",
-            ) as librespot_proc:
-                async for chunk in librespot_proc.iter_any(chunk_size):
-                    yield chunk
-                if librespot_proc.returncode == 0:
-                    self.logger.log(VERBOSE_LOG_LEVEL, f"Streaming {spotify_uri} ready.")
-                    break
-                if not retry:
-                    raise AudioError(
-                        f"Failed to stream {spotify_uri} - error: {librespot_proc.returncode}"
-                    )
-                # do one retry attempt - accounting for the fact that the token might have expired
-                auth_info = await self.login(force_refresh=True)
+        args = [
+            librespot,
+            "-c",
+            CACHE_DIR,
+            "-M",
+            "256M",
+            "--passthrough",
+            "-b",
+            "320",
+            "--backend",
+            "pipe",
+            "--single-track",
+            spotify_uri,
+            "--token",
+            auth_info["access_token"],
+        ]
+        if seek_position:
+            args += ["--start-position", str(int(seek_position))]
+        chunk_size = get_chunksize(streamdetails.audio_format)
+        stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
+        self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
+        async with AsyncProcess(
+            args,
+            stdout=True,
+            stderr=stderr,
+            name="librespot",
+        ) as librespot_proc:
+            async for chunk in librespot_proc.iter_any(chunk_size):
+                yield chunk
 
     def _parse_artist(self, artist_obj):
         """Parse spotify artist object to generic layout."""
@@ -784,11 +774,12 @@ class SpotifyProvider(MusicProvider):
         playlist.cache_checksum = str(playlist_obj["snapshot_id"])
         return playlist
 
-    async def login(self, retry: bool = True, force_refresh: bool = False) -> dict:
+    @lock
+    async def login(self, force_fresh: bool = False) -> dict:
         """Log-in Spotify and return Auth/token info."""
         # return existing token if we have one in memory
         if self._auth_info and (
-            self._auth_info["expires_at"] > (time.time() - 1800 if force_refresh else 120)
+            self._auth_info["expires_at"] > (time.time() - 1800 if force_fresh else 300)
         ):
             return self._auth_info
         # request new access token using the refresh token
@@ -801,23 +792,28 @@ class SpotifyProvider(MusicProvider):
             "refresh_token": refresh_token,
             "client_id": client_id,
         }
-        async with self.mass.http_session.post(
-            "https://accounts.spotify.com/api/token", data=params
-        ) as response:
-            if response.status != 200:
-                err = await response.text()
-                if "revoked" in err:
-                    # clear refresh token if it's invalid
-                    self.mass.config.set_raw_provider_config_value(
-                        self.instance_id, CONF_REFRESH_TOKEN, ""
-                    )
-                if retry:
-                    await asyncio.sleep(1)
-                    return await self.login(retry=False)
-                raise LoginFailed(f"Failed to refresh access token: {err}")
-            auth_info = await response.json()
-            auth_info["expires_at"] = int(auth_info["expires_in"] + time.time())
-            self.logger.debug("Successfully refreshed access token")
+        for _ in range(2):
+            async with self.mass.http_session.post(
+                "https://accounts.spotify.com/api/token", data=params
+            ) as response:
+                if response.status != 200:
+                    err = await response.text()
+                    if "revoked" in err:
+                        # clear refresh token if it's invalid
+                        self.mass.config.set_raw_provider_config_value(
+                            self.instance_id, CONF_REFRESH_TOKEN, ""
+                        )
+                        raise LoginFailed(f"Failed to refresh access token: {err}")
+                    # the token failed to refresh, we allow one retry
+                    await asyncio.sleep(2)
+                    continue
+                # if we reached this point, the token has been successfully refreshed
+                auth_info = await response.json()
+                auth_info["expires_at"] = int(auth_info["expires_in"] + time.time())
+                self.logger.debug("Successfully refreshed access token")
+                break
+        else:
+            raise LoginFailed(f"Failed to refresh access token: {err}")
 
         # make sure that our updated creds get stored in memory + config
         self._auth_info = auth_info
@@ -854,7 +850,8 @@ class SpotifyProvider(MusicProvider):
         url = f"https://api.spotify.com/v1/{endpoint}"
         kwargs["market"] = "from_token"
         kwargs["country"] = "from_token"
-        auth_info = kwargs.pop("auth_info", await self.login())
+        if not (auth_info := kwargs.pop("auth_info", None)):
+            auth_info = await self.login()
         headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
         locale = self.mass.metadata.locale.replace("_", "-")
         language = locale.split("-")[0]