import asyncio
import contextlib
-import logging
import os
import time
from typing import TYPE_CHECKING, Any, cast
from music_assistant.constants import VERBOSE_LOG_LEVEL
from music_assistant.helpers.app_vars import app_var
-from music_assistant.helpers.audio import get_chunksize
from music_assistant.helpers.auth import AuthenticationHelper
from music_assistant.helpers.json import json_loads
from music_assistant.helpers.process import AsyncProcess, check_output
]
if seek_position:
args += ["--start-position", str(int(seek_position))]
- chunk_size = get_chunksize(streamdetails.audio_format)
- stderr = bool(self.logger.isEnabledFor(logging.DEBUG))
- bytes_received = 0
- log_lines: list[str] = []
-
- librespot_proc: AsyncProcess = AsyncProcess(
- args,
- stdout=True,
- stderr=stderr,
- name="librespot",
- )
- try:
- await librespot_proc.start()
-
- async def _read_stderr():
- logger = self.logger.getChild("librespot")
- async for line in librespot_proc.iter_stderr():
- log_lines.append(line)
- logger.log(VERBOSE_LOG_LEVEL, line)
-
- if stderr:
- log_reader = self.mass.create_task(_read_stderr())
-
- async for chunk in librespot_proc.iter_any(chunk_size):
- yield chunk
- bytes_received += len(chunk)
- if stderr:
- await log_reader
-
- if bytes_received == 0:
- raise AudioError("No audio received from librespot")
-
- finally:
- await librespot_proc.close()
- if not bytes_received:
- log_lines = "\n".join(log_lines)
- self.logger.error("Error while streaming track %s\n%s", spotify_uri, log_lines)
+
+ # we retry twice in case librespot fails to start
+ for is_last_attempt in (False, True):
+ librespot_proc: AsyncProcess = AsyncProcess(
+ args,
+ stdout=True,
+ stderr=None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False,
+ name="librespot",
+ )
+ try:
+ await librespot_proc.start()
+
+ # get first chunk with timeout, to catch the issue where librespot is not starting
+ # which seems to happen from time to time (but rarely)
+ try:
+ chunk = await asyncio.wait_for(librespot_proc.read(64000), timeout=2)
+ yield chunk
+ except TimeoutError:
+ raise AudioError("No audio received from librespot within timeout")
+
+ # keep yielding chunks until librespot is done
+ async for chunk in librespot_proc.iter_chunked():
+ yield chunk
+
+ # if we reach this point, streaming succeeded and we can break the loop
+ break
+ except (asyncio.CancelledError, GeneratorExit):
+ raise
+ except Exception as e:
+ if is_last_attempt:
+ raise
+ self.logger.error("Error streaming audio: %s - will retry once", str(e))
+ finally:
+ await librespot_proc.close()
def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""