seconds_streamed: float | None = None
target_loudness: float | None = None
bypass_loudness_normalization: bool = False
+ strip_silence_begin: bool = False
+ strip_silence_end: bool = False
+ stream_error: bool | None = None
def __str__(self) -> str:
"""Return pretty printable string of object."""
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, MASS_LOGO_ONLINE
+from music_assistant.constants import (
+ CONF_CROSSFADE,
+ CONF_FLOW_MODE,
+ FALLBACK_DURATION,
+ MASS_LOGO_ONLINE,
+)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.audio import get_stream_details
from music_assistant.server.models.core_controller import CoreController
queue_item.streamdetails = await get_stream_details(
self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
)
+ # allow stripping silence from the end of the track if crossfade is enabled
+ # this will allow for smoother crossfades
+ if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
+ queue_item.streamdetails.strip_silence_end = True
# send play_media request to player
# NOTE that we debounce this a bit to account for someone hitting the next button
# like a madman. This will prevent the player from being overloaded with requests.
# maximum quality of thumbs
if queue_item.media_item:
queue_item.media_item = await self.mass.music.get_item_by_uri(queue_item.uri)
+ # allow stripping silence from the begin/end of the track if crossfade is enabled
+ # this will allow for (much) smoother crossfades
+ if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
+ queue_item.streamdetails.strip_silence_end = True
+ queue_item.streamdetails.strip_silence_begin = True
# we're all set, this is our next item
next_item = queue_item
break
from __future__ import annotations
-import asyncio
import os
import time
import urllib.parse
)
from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
- FFMpeg,
check_audio_support,
crossfade_pcm_parts,
get_chunksize,
get_hls_radio_stream,
get_hls_substream,
get_icy_radio_stream,
+ get_media_stream,
get_player_filter_params,
get_silence,
get_stream_details,
- parse_loudnorm,
- strip_silence,
)
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
# all checks passed, start streaming!
self.logger.debug(
- "Start serving audio stream for QueueItem %s to %s",
+ "Start serving audio stream for QueueItem %s (%s) to %s",
+ queue_item.name,
queue_item.uri,
queue.display_name,
)
bit_depth=queue_item.streamdetails.audio_format.bit_depth,
channels=2,
)
+ chunk_num = 0
async for chunk in get_ffmpeg_stream(
audio_input=self.get_media_stream(
streamdetails=queue_item.streamdetails,
):
try:
await resp.write(chunk)
+ chunk_num += 1
except (BrokenPipeError, ConnectionResetError, ConnectionError):
break
+ if queue_item.streamdetails.stream_error:
+ self.logger.error(
+ "Error streaming QueueItem %s (%s) to %s",
+ queue_item.name,
+ queue_item.uri,
+ queue.display_name,
+ )
if queue.stream_finished is not None:
queue.stream_finished = True
return resp
use_crossfade,
)
total_bytes_sent = 0
- started = time.time()
while True:
# get (next) queue item to stream
pcm_format=pcm_format,
):
# buffer size needs to be big enough to include the crossfade part
- # allow it to be a bit smaller when playback just starts
- if not use_crossfade:
- req_buffer_size = pcm_sample_size * 2
- elif (time.time() - started) > 120:
- # additional 5 seconds to strip silence from last part
- req_buffer_size = crossfade_size + pcm_sample_size * 5
- else:
- req_buffer_size = crossfade_size
+ req_buffer_size = pcm_sample_size * 2 if not use_crossfade else crossfade_size
# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
#### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
if last_fadeout_part:
- # strip silence from last part
- buffer = await strip_silence(
- self.mass,
- buffer,
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- reverse=False,
- )
# perform crossfade
fadein_part = buffer[:crossfade_size]
remaining_bytes = buffer[crossfade_size:]
bytes_written += len(last_fadeout_part)
last_fadeout_part = b""
if use_crossfade:
- # strip silence from last part
- buffer = await strip_silence(
- self.mass,
- buffer,
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- reverse=True,
- )
# if crossfade is enabled, save fadeout part to pickup for next track
last_fadeout_part = buffer[-crossfade_size:]
remaining_bytes = buffer[:-crossfade_size]
pcm_format: AudioFormat,
) -> AsyncGenerator[tuple[bool, bytes], None]:
"""Get the audio stream for the given streamdetails as raw pcm chunks."""
- logger = self.logger.getChild("media_stream")
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio:
streamdetails.seek_position = 0
)
filter_rule += ":print_format=json"
filter_params.append(filter_rule)
- if streamdetails.fade_in:
- filter_params.append("afade=type=in:start_time=0:duration=3")
if streamdetails.stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
extra_input_args += ["-decryption_key", streamdetails.decryption_key]
else:
audio_source = streamdetails.path
- if streamdetails.seek_position:
- extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
+
+ # handle seek support
+ if (
+ streamdetails.seek_position
+ and streamdetails.media_type != MediaType.RADIO
+ and streamdetails.stream_type != StreamType.CUSTOM
+ ):
+ extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
if streamdetails.media_type == MediaType.RADIO:
# pad some silence before the radio stream starts to create some headroom
async for chunk in get_silence(2, pcm_format):
yield chunk
- logger.debug("start media stream for: %s", streamdetails.uri)
- bytes_sent = 0
- finished = False
- try:
- async with FFMpeg(
- audio_input=audio_source,
- input_format=streamdetails.audio_format,
- output_format=pcm_format,
- filter_params=filter_params,
- extra_input_args=extra_input_args,
- collect_log_history=True,
- logger=logger,
- ) as ffmpeg_proc:
- async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
- bytes_sent += len(chunk)
- yield chunk
- # del chunk
- finished = True
- finally:
- if "ffmpeg_proc" not in locals():
- # edge case: ffmpeg process was not yet started
- return # noqa: B012
- if finished and not ffmpeg_proc.closed:
- await asyncio.wait_for(ffmpeg_proc.wait(), 60)
- elif not ffmpeg_proc.closed:
- await ffmpeg_proc.close()
-
- # try to determine how many seconds we've streamed
- seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
- logger.debug(
- "stream %s (with code %s) for %s - seconds streamed: %s",
- "finished" if finished else "aborted",
- ffmpeg_proc.returncode,
- streamdetails.uri,
- seconds_streamed,
- )
- streamdetails.seconds_streamed = seconds_streamed
- # store accurate duration
- if finished and not streamdetails.seek_position and seconds_streamed:
- streamdetails.duration = seconds_streamed
-
- # parse loudnorm data if we have that collected
- if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
- required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
- if finished or (seconds_streamed >= required_seconds):
- logger.debug(
- "Loudness measurement for %s: %s",
- streamdetails.uri,
- loudness_details,
- )
- streamdetails.loudness = loudness_details
- self.mass.create_task(
- self.mass.music.set_track_loudness(
- streamdetails.item_id, streamdetails.provider, loudness_details
- )
- )
- # report playback
- if finished or seconds_streamed > 30:
- self.mass.create_task(
- self.mass.music.mark_item_played(
- streamdetails.media_type,
- streamdetails.item_id,
- streamdetails.provider,
- )
- )
- if music_prov := self.mass.get_provider(streamdetails.provider):
- self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+ async for chunk in get_media_stream(
+ self.mass,
+ streamdetails=streamdetails,
+ pcm_format=pcm_format,
+ audio_source=audio_source,
+ filter_params=filter_params,
+ extra_input_args=extra_input_args,
+ ):
+ yield chunk
def _log_request(self, request: web.Request) -> None:
"""Log request."""
# write EOF once we've reached the end of the input stream
await self.write_eof()
except Exception as err:
+ if isinstance(err, asyncio.CancelledError):
+ return
# make sure we dont swallow any exceptions and we bail out
# once our audio source fails.
- if not isinstance(err, asyncio.CancelledError):
- self.logger.exception(err)
- await self.close(True)
+ self.logger.error(
+ "Stream error: %s",
+ str(err),
+ exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
+ )
+ await self.write_eof()
async def crossfade_pcm_parts(
return streamdetails
+async def get_media_stream(
+ mass: MusicAssistant,
+ streamdetails: StreamDetails,
+ pcm_format: AudioFormat,
+ audio_source: AsyncGenerator[bytes, None] | str,
+ filter_params: list[str] | None = None,
+ extra_input_args: list[str] | None = None,
+) -> AsyncGenerator[bytes, None]:
+ """Get PCM audio stream for given media details."""
+ logger = LOGGER.getChild("media_stream")
+ logger.debug("start media stream for: %s", streamdetails.uri)
+ strip_silence_begin = streamdetails.strip_silence_begin
+ strip_silence_end = streamdetails.strip_silence_end
+ if streamdetails.fade_in:
+ filter_params.append("afade=type=in:start_time=0:duration=3")
+ strip_silence_begin = False
+ bytes_sent = 0
+ chunk_number = 0
+ buffer: bytes = b""
+ finished = False
+ try:
+ async with FFMpeg(
+ audio_input=audio_source,
+ input_format=streamdetails.audio_format,
+ output_format=pcm_format,
+ filter_params=filter_params,
+ extra_input_args=extra_input_args,
+ collect_log_history=True,
+ logger=logger,
+ ) as ffmpeg_proc:
+ async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
+ chunk_number += 1
+ # determine buffer size dynamically
+ if chunk_number < 5 and strip_silence_begin:
+ req_buffer_size = int(pcm_format.pcm_sample_size * 4)
+ elif chunk_number > 30 and strip_silence_end:
+ req_buffer_size = int(pcm_format.pcm_sample_size * 8)
+ else:
+ req_buffer_size = int(pcm_format.pcm_sample_size * 2)
+
+ # always append to buffer
+ buffer += chunk
+ del chunk
+
+ if len(buffer) < req_buffer_size:
+ # buffer is not full enough, move on
+ continue
+
+ if chunk_number == 5 and strip_silence_begin:
+ # strip silence from begin of audio
+ chunk = await strip_silence( # noqa: PLW2901
+ mass, buffer, pcm_format.sample_rate, pcm_format.bit_depth
+ )
+ bytes_sent += len(chunk)
+ yield chunk
+ buffer = b""
+ continue
+
+ #### OTHER: enough data in buffer, feed to output
+ while len(buffer) > req_buffer_size:
+ yield buffer[: pcm_format.pcm_sample_size]
+ bytes_sent += pcm_format.pcm_sample_size
+ buffer = buffer[pcm_format.pcm_sample_size :]
+
+ # end of audio/track reached
+ if strip_silence_end and buffer:
+ # strip silence from end of audio
+ buffer = await strip_silence(
+ mass,
+ buffer,
+ sample_rate=pcm_format.sample_rate,
+ bit_depth=pcm_format.bit_depth,
+ reverse=True,
+ )
+ # send remaining bytes in buffer
+ bytes_sent += len(buffer)
+ yield buffer
+ del buffer
+
+ if bytes_sent == 0:
+ # edge case: no audio data was sent
+ streamdetails.stream_error = True
+ finished = False
+ logger.warning("Stream error on %s", streamdetails.uri)
+ # we send a bit of silence so players get at least some data
+ # without it, some players refuse to skip to the next track
+ async for chunk in get_silence(6, pcm_format):
+ yield chunk
+ bytes_sent += len(chunk)
+ else:
+ finished = True
+ finally:
+ if "ffmpeg_proc" not in locals():
+ # edge case: ffmpeg process was not yet started
+ return # noqa: B012
+ if finished and not ffmpeg_proc.closed:
+ await asyncio.wait_for(ffmpeg_proc.wait(), 60)
+ elif not ffmpeg_proc.closed:
+ await ffmpeg_proc.close()
+
+ # try to determine how many seconds we've streamed
+ seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+ logger.debug(
+ "stream %s (with code %s) for %s - seconds streamed: %s",
+ "finished" if finished else "aborted",
+ ffmpeg_proc.returncode,
+ streamdetails.uri,
+ seconds_streamed,
+ )
+
+ streamdetails.seconds_streamed = seconds_streamed
+ # store accurate duration
+ if finished and not streamdetails.seek_position and seconds_streamed:
+ streamdetails.duration = seconds_streamed
+
+ # parse loudnorm data if we have that collected
+ if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
+ required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+ if finished or (seconds_streamed >= required_seconds):
+ logger.debug(
+ "Loudness measurement for %s: %s",
+ streamdetails.uri,
+ loudness_details,
+ )
+ streamdetails.loudness = loudness_details
+ mass.create_task(
+ mass.music.set_track_loudness(
+ streamdetails.item_id, streamdetails.provider, loudness_details
+ )
+ )
+ # report playback
+ if finished or seconds_streamed > 30:
+ mass.create_task(
+ mass.music.mark_item_played(
+ streamdetails.media_type,
+ streamdetails.item_id,
+ streamdetails.provider,
+ )
+ )
+ if music_prov := mass.get_provider(streamdetails.provider):
+ mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
+
+
def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None):
"""Generate a wave header from given params."""
# pylint: disable=no-member
from __future__ import annotations
import asyncio
+import functools
import importlib
import logging
import platform
import urllib.error
import urllib.parse
import urllib.request
-from collections.abc import Coroutine
+from collections.abc import Awaitable, Callable, Coroutine
from functools import lru_cache
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as pkg_version
from types import TracebackType
-from typing import TYPE_CHECKING, Self
+from typing import TYPE_CHECKING, Any, ParamSpec, Self, TypeVar
import ifaddr
import memory_tempfile
if len(self._tasks) > 0:
await asyncio.wait(self._tasks)
self._tasks.clear()
+
+
+_R = TypeVar("_R")
+_P = ParamSpec("_P")
+
+
+def lock(
+ func: Callable[_P, Awaitable[_R]],
+) -> Callable[_P, Coroutine[Any, Any, _R]]:
+ """Call async function using a Lock."""
+
+ @functools.wraps(func)
+ async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
+ """Call async function using the throttler with retries."""
+ if not (func_lock := getattr(func, "lock", None)):
+ func_lock = asyncio.Lock()
+ func.lock = func_lock
+ async with func_lock:
+ return await func(*args, **kwargs)
+
+ return wrapper
# pylint: enable=no-name-in-module
from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
+from music_assistant.server.helpers.util import lock
from music_assistant.server.models.music_provider import MusicProvider
if TYPE_CHECKING:
playlist.cache_checksum = str(playlist_obj["updated_at"])
return playlist
+ @lock
async def _auth_token(self):
"""Login to qobuz and store the token."""
if self._user_auth_info:
StreamType,
)
from music_assistant.common.models.errors import (
- AudioError,
LoginFailed,
MediaNotFoundError,
ResourceTemporarilyUnavailable,
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
+from music_assistant.server.helpers.util import lock
from music_assistant.server.models.music_provider import MusicProvider
if TYPE_CHECKING:
_auth_info: str | None = None
_sp_user: dict[str, Any] | None = None
_librespot_bin: str | None = None
- # rate limiter needs to be specified on provider-level,
- # so make it an instance attribute
- throttler = ThrottlerManager(rate_limit=1, period=2)
+ throttler: ThrottlerManager
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
+ self.throttler = ThrottlerManager(rate_limit=1, period=2)
if self.config.get_value(CONF_CLIENT_ID):
# loosen the throttler a bit when a custom client id is used
self.throttler.rate_limit = 45
items = await self._get_data(endpoint, seed_tracks=prov_track_id, limit=limit)
return [self._parse_track(item) for item in items["tracks"] if (item and item["id"])]
- @throttle_with_retries
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- # make sure that the token is still valid by just requesting it
- await self.login()
+ # fetch full track details
+ # this will also check if the track is available for streaming
+ # and use spotify's track linking feature to serve a substitute track
+ # if the original track is not available
+ track = await self.get_track(item_id)
return StreamDetails(
- item_id=item_id,
+ item_id=track.item_id,
provider=self.instance_id,
audio_format=AudioFormat(
content_type=ContentType.OGG,
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
- auth_info = await self.login()
+ auth_info = await self.login(force_fresh=True)
librespot = await self.get_librespot_binary()
spotify_uri = f"spotify://track:{streamdetails.item_id}"
- for retry in (True, False):
- args = [
- librespot,
- "-c",
- CACHE_DIR,
- "-M",
- "256M",
- "--passthrough",
- "-b",
- "320",
- "--backend",
- "pipe",
- "--single-track",
- spotify_uri,
- "--token",
- auth_info["access_token"],
- ]
- if seek_position:
- args += ["--start-position", str(int(seek_position))]
- chunk_size = get_chunksize(streamdetails.audio_format)
- stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
- self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
-
- async with AsyncProcess(
- args,
- stdout=True,
- stderr=stderr,
- name="librespot",
- ) as librespot_proc:
- async for chunk in librespot_proc.iter_any(chunk_size):
- yield chunk
- if librespot_proc.returncode == 0:
- self.logger.log(VERBOSE_LOG_LEVEL, f"Streaming {spotify_uri} ready.")
- break
- if not retry:
- raise AudioError(
- f"Failed to stream {spotify_uri} - error: {librespot_proc.returncode}"
- )
- # do one retry attempt - accounting for the fact that the token might have expired
- auth_info = await self.login(force_refresh=True)
+ args = [
+ librespot,
+ "-c",
+ CACHE_DIR,
+ "-M",
+ "256M",
+ "--passthrough",
+ "-b",
+ "320",
+ "--backend",
+ "pipe",
+ "--single-track",
+ spotify_uri,
+ "--token",
+ auth_info["access_token"],
+ ]
+ if seek_position:
+ args += ["--start-position", str(int(seek_position))]
+ chunk_size = get_chunksize(streamdetails.audio_format)
+ stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
+ self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
+ async with AsyncProcess(
+ args,
+ stdout=True,
+ stderr=stderr,
+ name="librespot",
+ ) as librespot_proc:
+ async for chunk in librespot_proc.iter_any(chunk_size):
+ yield chunk
def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""
playlist.cache_checksum = str(playlist_obj["snapshot_id"])
return playlist
- async def login(self, retry: bool = True, force_refresh: bool = False) -> dict:
+ @lock
+ async def login(self, force_fresh: bool = False) -> dict:
"""Log-in Spotify and return Auth/token info."""
# return existing token if we have one in memory
if self._auth_info and (
- self._auth_info["expires_at"] > (time.time() - 1800 if force_refresh else 120)
+ self._auth_info["expires_at"] > (time.time() - 1800 if force_fresh else 300)
):
return self._auth_info
# request new access token using the refresh token
"refresh_token": refresh_token,
"client_id": client_id,
}
- async with self.mass.http_session.post(
- "https://accounts.spotify.com/api/token", data=params
- ) as response:
- if response.status != 200:
- err = await response.text()
- if "revoked" in err:
- # clear refresh token if it's invalid
- self.mass.config.set_raw_provider_config_value(
- self.instance_id, CONF_REFRESH_TOKEN, ""
- )
- if retry:
- await asyncio.sleep(1)
- return await self.login(retry=False)
- raise LoginFailed(f"Failed to refresh access token: {err}")
- auth_info = await response.json()
- auth_info["expires_at"] = int(auth_info["expires_in"] + time.time())
- self.logger.debug("Successfully refreshed access token")
+ for _ in range(2):
+ async with self.mass.http_session.post(
+ "https://accounts.spotify.com/api/token", data=params
+ ) as response:
+ if response.status != 200:
+ err = await response.text()
+ if "revoked" in err:
+ # clear refresh token if it's invalid
+ self.mass.config.set_raw_provider_config_value(
+ self.instance_id, CONF_REFRESH_TOKEN, ""
+ )
+ raise LoginFailed(f"Failed to refresh access token: {err}")
+ # the token failed to refresh, we allow one retry
+ await asyncio.sleep(2)
+ continue
+ # if we reached this point, the token has been successfully refreshed
+ auth_info = await response.json()
+ auth_info["expires_at"] = int(auth_info["expires_in"] + time.time())
+ self.logger.debug("Successfully refreshed access token")
+ break
+ else:
+ raise LoginFailed(f"Failed to refresh access token: {err}")
# make sure that our updated creds get stored in memory + config
self._auth_info = auth_info
url = f"https://api.spotify.com/v1/{endpoint}"
kwargs["market"] = "from_token"
kwargs["country"] = "from_token"
- auth_info = kwargs.pop("auth_info", await self.login())
+ if not (auth_info := kwargs.pop("auth_info", None)):
+ auth_info = await self.login()
headers = {"Authorization": f'Bearer {auth_info["access_token"]}'}
locale = self.mass.metadata.locale.replace("_", "-")
language = locale.split("-")[0]