from __future__ import annotations
import asyncio
+from collections import deque
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING
# Regular track/episode streaming - audiobooks are handled in the provider
media_type = "episode" if streamdetails.media_type == MediaType.PODCAST_EPISODE else "track"
spotify_uri = f"spotify://{media_type}:{streamdetails.item_id}"
-
- self.provider.logger.log(
- VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot"
- )
-
async for chunk in self.stream_spotify_uri(spotify_uri, seek_position):
yield chunk
async def stream_spotify_uri(
self, spotify_uri: str, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
- """Stream a Spotify URI using librespot.
-
- This internal method handles the entire process, from authentication to playback.
- """
+ """Return the audio stream for the Spotify URI."""
+ self.provider.logger.log(
+ VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot"
+ )
# Validate that librespot binary is available
if not self.provider._librespot_bin:
raise AudioError("Librespot binary not available")
if seek_position:
args += ["--start-position", str(int(seek_position))]
- # we retry twice in case librespot fails to start
- for attempt in (1, 2):
- log_librespot = self.provider.logger.isEnabledFor(VERBOSE_LOG_LEVEL) or attempt == 2
- async with AsyncProcess(
- args,
- stdout=True,
- stderr=None if log_librespot else False,
- name="librespot",
- ) as librespot_proc:
- # get first chunk with timeout, to catch the issue where librespot is not starting
- # which seems to happen from time to time (but rarely)
- try:
- chunk = await asyncio.wait_for(librespot_proc.read(64000), timeout=10 * attempt)
- if not chunk:
- raise AudioError(f"No audio data received from librespot for {spotify_uri}")
- yield chunk
- except (TimeoutError, AudioError):
- err_mesg = f"No audio received from librespot within timeout for {spotify_uri}"
- if attempt == 2:
- raise AudioError(err_mesg)
- self.provider.logger.warning("%s - will retry once", err_mesg)
- continue
-
- # keep yielding chunks until librespot is done
+ async with AsyncProcess(
+ args,
+ stdout=True,
+ stderr=True,
+ name="librespot",
+ ) as librespot_proc:
+ log_history: deque[str] = deque(maxlen=10)
+ logger = self.provider.logger
+
+ async def log_librespot_output() -> None:
+ """Log librespot output if verbose logging is enabled."""
+ async for line in librespot_proc.iter_stderr():
+ log_history.append(line)
+ if "ERROR" in line or "WARNING" in line:
+ logger.warning("[librespot] %s", line)
+ else:
+ logger.log(VERBOSE_LOG_LEVEL, "[librespot] %s", line)
+
+ librespot_proc.attach_stderr_reader(asyncio.create_task(log_librespot_output()))
+
+ try:
+ # yield from librespot's stdout
async for chunk in librespot_proc.iter_chunked():
yield chunk
- # if we reach this point, streaming succeeded and we can break the loop
- break
+ if librespot_proc.returncode != 0:
+ raise AudioError(
+ f"Librespot exited with code {librespot_proc.returncode} for {spotify_uri}"
+ )
+
+ except Exception as ex:
+ log_lines_str = "\n".join(log_history)
+ logger.error(
+ "Librespot streaming error for %s: %s\n%s",
+ spotify_uri,
+ ex,
+ log_lines_str,
+ )
+ raise AudioError(f"Error streaming from librespot for {spotify_uri}: {ex}") from ex